diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0677d44..ffe54f5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -150,6 +150,18 @@ option(BUILD_VISUALIZATION "Build visualization server" ON)
option(USE_TBB "Use Intel TBB for parallel algorithms" OFF)
option(USE_DPDK "Use DPDK for kernel bypass networking" OFF)
option(USE_LOCK_FREE "Use lock-free data structures" ON)
+option(ENABLE_LTO "Enable Link-Time Optimization for release builds" OFF)
+
+# LTO support
+if(ENABLE_LTO)
+ include(CheckIPOSupported)
+ check_ipo_supported(RESULT LTO_SUPPORTED)
+ if(LTO_SUPPORTED)
+ message(STATUS "LTO/IPO enabled")
+ else()
+ message(STATUS "LTO/IPO not supported by this compiler")
+ endif()
+endif()
# Include directories
include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${WEBSOCKETPP_INCLUDE_DIRS}
@@ -191,7 +203,10 @@ set(CORE_SOURCES
core/persistence/journal/Journal.cpp
core/persistence/journal/JournalEntry.cpp
core/persistence/snapshot/SnapshotManager.cpp
- core/routing/OrderRouter.cpp)
+ core/routing/OrderRouter.cpp
+ core/instrument/InstrumentManager.cpp
+ core/instrument/ResourceAllocator.cpp
+ core/utils/ThreadAffinity.cpp)
# Strategy library files
set(STRATEGY_SOURCES
@@ -203,7 +218,10 @@ set(STRATEGY_SOURCES
strategies/analytics/MarketRegimeDetector.cpp
strategies/rl/RLParameterAdapter.cpp
strategies/backtesting/BacktestEngine.cpp
- strategies/config/StrategyConfig.cpp)
+ strategies/config/StrategyConfig.cpp
+ strategies/arbitrage/ArbitrageDetector.cpp
+ strategies/arbitrage/ArbitrageExecutor.cpp
+ strategies/analytics/CrossMarketCorrelation.cpp)
# Exchange library files
set(EXCHANGE_SOURCES
@@ -330,6 +348,11 @@ endif()
target_link_libraries(pinnaclemm ${PINNACLEMM_LIBS})
+# Apply LTO to main executable in release builds
+if(ENABLE_LTO AND LTO_SUPPORTED)
+ set_property(TARGET pinnaclemm PROPERTY INTERPROCEDURAL_OPTIMIZATION TRUE)
+endif()
+
# Tests
if(BUILD_TESTS)
enable_testing()
@@ -488,6 +511,33 @@ if(BUILD_TESTS)
Threads::Threads
Boost::filesystem)
add_test(NAME DisasterRecoveryTests COMMAND disaster_recovery_tests)
+
+ # Instrument Manager tests
+ add_executable(instrument_manager_tests tests/unit/InstrumentManagerTests.cpp)
+ target_link_libraries(
+ instrument_manager_tests
+ core
+ risk
+ strategy
+ exchange
+ GTest::gtest_main
+ GTest::gtest
+ Threads::Threads)
+ add_test(NAME InstrumentManagerTests COMMAND instrument_manager_tests)
+
+ # Arbitrage Detector tests
+ add_executable(arbitrage_detector_tests tests/unit/ArbitrageDetectorTests.cpp)
+ target_link_libraries(arbitrage_detector_tests core strategy
+ GTest::gtest_main GTest::gtest Threads::Threads)
+ add_test(NAME ArbitrageDetectorTests COMMAND arbitrage_detector_tests)
+
+ # Cross-Market Correlation tests
+ add_executable(cross_market_correlation_tests
+ tests/unit/CrossMarketCorrelationTests.cpp)
+ target_link_libraries(cross_market_correlation_tests core strategy
+ GTest::gtest_main GTest::gtest Threads::Threads)
+ add_test(NAME CrossMarketCorrelationTests
+ COMMAND cross_market_correlation_tests)
endif()
# Benchmarks
@@ -558,6 +608,18 @@ if(BUILD_BENCHMARKS)
add_executable(risk_check_benchmark tests/performance/RiskCheckBenchmark.cpp)
target_link_libraries(risk_check_benchmark core risk benchmark::benchmark
Threads::Threads)
+
+ # Multi-instrument benchmarks
+ add_executable(multi_instrument_benchmark
+ tests/performance/MultiInstrumentBenchmark.cpp)
+ target_link_libraries(
+ multi_instrument_benchmark
+ core
+ risk
+ strategy
+ exchange
+ benchmark::benchmark
+ Threads::Threads)
endif()
# Install targets
diff --git a/README.md b/README.md
index 38d55a5..4889eda 100644
--- a/README.md
+++ b/README.md
@@ -21,7 +21,8 @@
Order Routing •
Performance Benchmarks •
API Reference •
- Exchange Connectors
+ Exchange Connectors •
+ Multi-Instrument
@@ -52,6 +53,11 @@ PinnacleMM is a high-performance, production-grade market making system designed
- **Disaster Recovery**: Atomic risk state persistence, position reconciliation, and labeled backup management
- **Kubernetes Deployment**: Production-ready StatefulSet with health probes, PVC, network policies, and pod disruption budget
- **Enterprise Security**: AES-256-CBC encryption with unique salts, 100,000 PBKDF2 iterations, secure password input, comprehensive input validation, audit logging, rate limiting, and certificate pinning
+- **Multi-Instrument Trading**: Simultaneous trading across multiple symbols with `InstrumentManager` orchestration
+- **Cross-Exchange Arbitrage**: Venue price discrepancy detection with fee-adjusted opportunity scanning and dry-run execution
+- **Cross-Market Correlation**: Pearson/rolling correlation, lead-lag analysis, Engle-Granger cointegration, and signal-based spread adjustment
+- **Per-Symbol Risk Tracking**: Atomic per-symbol position, PnL, and volume tracking with configurable per-symbol limits
+- **Dynamic Resource Allocation**: CPU core distribution and thread pinning for multi-instrument deployments
- **Comprehensive Testing**: Extensive test suite ensuring reliability and performance
## System Architecture
@@ -161,6 +167,12 @@ cd build && ./pinnaclemm --mode simulation --enable-ml --json-log --json-log-fil
# Combined: ML + visualization + JSON logging
cd build && ./pinnaclemm --mode simulation --enable-ml --enable-visualization --json-log --json-log-file sim_ml_data.jsonl
+# Multi-instrument simulation
+cd build && ./pinnaclemm --mode simulation --symbols BTC-USD,ETH-USD
+
+# Arbitrage detection (dry-run)
+cd build && ./pinnaclemm --mode simulation --symbol BTC-USD --enable-arbitrage --arb-dry-run
+
# The visualization dashboard will be available at:
# - WebSocket: ws://localhost:8080 (or custom port with --viz-ws-port)
# - REST API: http://localhost:8081 (or custom port with --viz-api-port)
@@ -453,6 +465,9 @@ docker run -it --name pinnaclemm-live \
- **Market Regime Detector**: Hidden Markov Model-based detection of 8 market regimes
- **RL Parameter Adapter**: Reinforcement learning for dynamic strategy parameter optimization
- **Advanced Backtesting Engine**: Historical replay with Monte Carlo analysis and statistical testing
+- **Instrument Manager**: Multi-instrument orchestration with per-symbol order books, strategies, and simulators
+- **Arbitrage Detector**: Cross-exchange price discrepancy detection with fee-adjusted scanning
+- **Cross-Market Correlation**: Statistical lead-lag analysis and cointegration testing for signal generation
- **Real-Time Visualization**: Web-based dashboard with Chart.js and D3.js visualization
- **FIX Protocol Engine**: Professional-grade FIX connectivity for institutional trading
- **Persistence System**: Crash recovery with memory-mapped files
@@ -548,6 +563,12 @@ open build/test_dashboard.html
# or manually: file:///path/to/PinnacleMM/build/test_dashboard.html
```
+### Multi-Instrument & Optimization
+- [Multi-Instrument Guide](docs/MULTI_INSTRUMENT_GUIDE.md) - **Multi-symbol trading with InstrumentManager**
+- [Cross-Exchange Arbitrage](docs/CROSS_EXCHANGE_ARBITRAGE.md) - **Venue spread detection and execution**
+- [Cross-Market Correlation](docs/CROSS_MARKET_CORRELATION.md) - **Statistical correlation and signal generation**
+- [Performance Optimization Guide](docs/PERFORMANCE_OPTIMIZATION_GUIDE.md) - **LTO, CPU affinity, object pooling, lock-free fixes**
+
### Exchange Integration
- [FIX Protocol Integration Guide](docs/FIX_PROTOCOL_INTEGRATION.md)
- [FIX Testing Guide](docs/TESTING_GUIDE.md)
@@ -621,6 +642,12 @@ cd build
./disaster_recovery_tests # 8 tests - state persistence, backups
./risk_check_benchmark # Risk check latency benchmarks
+# Test multi-instrument and optimization components (Phase 5)
+./instrument_manager_tests # 9 tests - lifecycle management
+./arbitrage_detector_tests # 8 tests - opportunity detection, fees
+./cross_market_correlation_tests # 7 tests - correlation, lead-lag
+./multi_instrument_benchmark # Startup scaling benchmarks
+
# Memory safety validation with Address Sanitizer (development builds)
cmake -DCMAKE_BUILD_TYPE=Debug -DENABLE_SANITIZERS=ON .. && make -j8
./pinnaclemm --mode simulation --symbol BTC-USD --verbose
diff --git a/config/default_config.json b/config/default_config.json
index f61843f..261db19 100644
--- a/config/default_config.json
+++ b/config/default_config.json
@@ -60,6 +60,30 @@
"keepSnapshots": 5,
"compactionThreshold": 1000000
},
+ "instruments": [
+ {
+ "symbol": "BTC-USD",
+ "enabled": true,
+ "useLockFree": true,
+ "enableML": false,
+ "baseSpreadBps": 10.0,
+ "orderQuantity": 0.01,
+ "maxPosition": 10.0
+ }
+ ],
+ "arbitrage": {
+ "enabled": false,
+ "minSpreadBps": 5.0,
+ "minProfitUsd": 1.0,
+ "maxStalenessMs": 500,
+ "scanIntervalMs": 10,
+ "dryRun": true,
+ "venues": ["coinbase", "kraken"],
+ "venueFees": {
+ "coinbase": 0.001,
+ "kraken": 0.0016
+ }
+ },
"risk_management": {
"limits": {
"max_position_size": 10.0,
diff --git a/core/instrument/InstrumentManager.cpp b/core/instrument/InstrumentManager.cpp
new file mode 100644
index 0000000..878ff98
--- /dev/null
+++ b/core/instrument/InstrumentManager.cpp
@@ -0,0 +1,398 @@
+#include "InstrumentManager.h"
+#include "../../strategies/basic/MLEnhancedMarketMaker.h"
+#include "../persistence/PersistenceManager.h"
+
+#include
+#include
+
+namespace pinnacle {
+namespace instrument {
+
+InstrumentManager::~InstrumentManager() { stopAll(); }
+
+bool InstrumentManager::addInstrument(const InstrumentConfig& config,
+ const std::string& mode) {
+ std::lock_guard lock(m_mutex);
+
+ if (m_instruments.count(config.symbol)) {
+ spdlog::warn("Instrument {} already registered", config.symbol);
+ return false;
+ }
+
+ auto ctx = std::make_shared();
+ ctx->symbol = config.symbol;
+ ctx->config = config;
+
+ // Try to recover order book from persistence
+ auto& persistenceManager = persistence::PersistenceManager::getInstance();
+ ctx->orderBook = persistenceManager.getRecoveredOrderBook(config.symbol);
+
+ if (ctx->orderBook) {
+ spdlog::info("Using recovered order book for {} with {} existing orders",
+ config.symbol, ctx->orderBook->getOrderCount());
+ } else {
+ if (config.useLockFree) {
+ ctx->orderBook = std::make_shared(config.symbol);
+ spdlog::info("[{}] Using lock-free order book", config.symbol);
+ } else {
+ ctx->orderBook = std::make_shared(config.symbol);
+ spdlog::info("[{}] Using mutex-based order book", config.symbol);
+ }
+ }
+
+ // Create strategy
+ strategy::StrategyConfig stratConfig;
+ stratConfig.symbol = config.symbol;
+ stratConfig.baseSpreadBps = config.baseSpreadBps;
+ stratConfig.orderQuantity = config.orderQuantity;
+ stratConfig.maxPosition = config.maxPosition;
+
+ if (config.enableML) {
+ strategy::MLEnhancedMarketMaker::MLConfig mlConfig{};
+ mlConfig.enableMLSpreadOptimization = true;
+ mlConfig.enableOnlineLearning = true;
+ mlConfig.fallbackToHeuristics = true;
+ mlConfig.mlConfidenceThreshold = 0.5;
+
+ ctx->strategy = std::make_shared(
+ config.symbol, stratConfig, mlConfig);
+ spdlog::info("[{}] Using ML-enhanced market maker", config.symbol);
+ } else {
+ ctx->strategy = std::make_shared(config.symbol,
+ stratConfig);
+ spdlog::info("[{}] Using basic market maker", config.symbol);
+ }
+
+ // Create simulator for non-live modes
+ if (mode != "live") {
+ ctx->simulator =
+ std::make_shared(ctx->orderBook);
+ }
+
+ m_instruments.emplace(config.symbol, std::move(ctx));
+ spdlog::info("Instrument {} added (mode={})", config.symbol, mode);
+ return true;
+}
+
+bool InstrumentManager::removeInstrument(const std::string& symbol) {
+ // Extract context under lock, then stop outside lock
+ std::shared_ptr ctx;
+ {
+ std::lock_guard lock(m_mutex);
+ auto it = m_instruments.find(symbol);
+ if (it == m_instruments.end()) {
+ spdlog::warn("Instrument {} not found for removal", symbol);
+ return false;
+ }
+ ctx = it->second;
+ m_instruments.erase(it);
+ }
+
+ // Stop components outside the lock (these may block)
+ if (ctx->strategy && ctx->strategy->isRunning()) {
+ ctx->strategy->stop();
+ }
+ if (ctx->simulator && ctx->simulator->isRunning()) {
+ ctx->simulator->stop();
+ }
+
+ spdlog::info("Instrument {} removed", symbol);
+ return true;
+}
+
+bool InstrumentManager::startAll() {
+ // Collect symbols to start under lock
+ std::vector toStart;
+ {
+ std::lock_guard lock(m_mutex);
+ for (auto& [symbol, ctx] : m_instruments) {
+ if (!ctx->running) {
+ toStart.push_back(symbol);
+ }
+ }
+ }
+
+ bool allOk = true;
+ for (const auto& symbol : toStart) {
+ // Get components under lock
+ std::shared_ptr strategy;
+ std::shared_ptr simulator;
+ std::shared_ptr orderBook;
+ {
+ std::lock_guard lock(m_mutex);
+ auto it = m_instruments.find(symbol);
+ if (it == m_instruments.end() || it->second->running) {
+ continue;
+ }
+ strategy = it->second->strategy;
+ simulator = it->second->simulator;
+ orderBook = it->second->orderBook;
+ }
+
+ if (!strategy) {
+ spdlog::error("[{}] Strategy is null", symbol);
+ allOk = false;
+ continue;
+ }
+
+ // Blocking calls outside lock
+ if (!strategy->initialize(orderBook)) {
+ spdlog::error("[{}] Failed to initialize strategy", symbol);
+ allOk = false;
+ continue;
+ }
+
+ if (!strategy->start()) {
+ spdlog::error("[{}] Failed to start strategy", symbol);
+ allOk = false;
+ continue;
+ }
+
+ if (simulator) {
+ if (!simulator->start()) {
+ spdlog::error("[{}] Failed to start simulator", symbol);
+ strategy->stop();
+ allOk = false;
+ continue;
+ }
+ }
+
+ // Mark running under lock
+ {
+ std::lock_guard lock(m_mutex);
+ auto it = m_instruments.find(symbol);
+ if (it != m_instruments.end()) {
+ it->second->running = true;
+ spdlog::info("[{}] Instrument started", symbol);
+ }
+ }
+ }
+
+ return allOk;
+}
+
+bool InstrumentManager::stopAll() {
+ // Collect contexts to stop under lock
+ std::vector>>
+ toStop;
+ {
+ std::lock_guard lock(m_mutex);
+ for (auto& [symbol, ctx] : m_instruments) {
+ if (ctx->running) {
+ toStop.emplace_back(symbol, ctx);
+ }
+ }
+ }
+
+ bool allOk = true;
+ for (auto& [symbol, ctx] : toStop) {
+ // Blocking calls outside lock
+ if (ctx->strategy && ctx->strategy->isRunning()) {
+ if (!ctx->strategy->stop()) {
+ spdlog::error("[{}] Failed to stop strategy", symbol);
+ allOk = false;
+ }
+ }
+
+ if (ctx->simulator && ctx->simulator->isRunning()) {
+ if (!ctx->simulator->stop()) {
+ spdlog::error("[{}] Failed to stop simulator", symbol);
+ allOk = false;
+ }
+ }
+
+ // Mark stopped under lock
+ {
+ std::lock_guard lock(m_mutex);
+ auto it = m_instruments.find(symbol);
+ if (it != m_instruments.end()) {
+ it->second->running = false;
+ spdlog::info("[{}] Instrument stopped", symbol);
+ }
+ }
+ }
+
+ return allOk;
+}
+
+bool InstrumentManager::startInstrument(const std::string& symbol) {
+ // Get components under lock
+ std::shared_ptr strategy;
+ std::shared_ptr simulator;
+ std::shared_ptr orderBook;
+ {
+ std::lock_guard lock(m_mutex);
+ auto it = m_instruments.find(symbol);
+ if (it == m_instruments.end()) {
+ return false;
+ }
+ if (it->second->running) {
+ return true;
+ }
+ strategy = it->second->strategy;
+ simulator = it->second->simulator;
+ orderBook = it->second->orderBook;
+ }
+
+ if (!strategy) {
+ spdlog::error("[{}] Strategy is null", symbol);
+ return false;
+ }
+
+ // Blocking calls outside lock
+ if (!strategy->initialize(orderBook)) {
+ spdlog::error("[{}] Failed to initialize strategy", symbol);
+ return false;
+ }
+ if (!strategy->start()) {
+ spdlog::error("[{}] Failed to start strategy", symbol);
+ return false;
+ }
+ if (simulator && !simulator->start()) {
+ spdlog::error("[{}] Failed to start simulator", symbol);
+ strategy->stop();
+ return false;
+ }
+
+ // Mark running under lock
+ {
+ std::lock_guard lock(m_mutex);
+ auto it = m_instruments.find(symbol);
+ if (it != m_instruments.end()) {
+ it->second->running = true;
+ }
+ }
+ return true;
+}
+
+bool InstrumentManager::stopInstrument(const std::string& symbol) {
+ // Get components under lock
+ std::shared_ptr strategy;
+ std::shared_ptr simulator;
+ {
+ std::lock_guard lock(m_mutex);
+ auto it = m_instruments.find(symbol);
+ if (it == m_instruments.end()) {
+ return false;
+ }
+ if (!it->second->running) {
+ return true;
+ }
+ strategy = it->second->strategy;
+ simulator = it->second->simulator;
+ }
+
+ // Blocking calls outside lock
+ if (strategy && strategy->isRunning()) {
+ strategy->stop();
+ }
+ if (simulator && simulator->isRunning()) {
+ simulator->stop();
+ }
+
+ // Mark stopped under lock
+ {
+ std::lock_guard lock(m_mutex);
+ auto it = m_instruments.find(symbol);
+ if (it != m_instruments.end()) {
+ it->second->running = false;
+ }
+ }
+ return true;
+}
+
+std::shared_ptr
+InstrumentManager::getContext(const std::string& symbol) {
+ std::lock_guard lock(m_mutex);
+ auto it = m_instruments.find(symbol);
+ if (it == m_instruments.end()) {
+ return nullptr;
+ }
+ return it->second;
+}
+
+std::shared_ptr
+InstrumentManager::getContext(const std::string& symbol) const {
+ std::lock_guard lock(m_mutex);
+ auto it = m_instruments.find(symbol);
+ if (it == m_instruments.end()) {
+ return nullptr;
+ }
+ return it->second;
+}
+
+std::vector InstrumentManager::getSymbols() const {
+ std::lock_guard lock(m_mutex);
+ std::vector symbols;
+ symbols.reserve(m_instruments.size());
+ for (const auto& [sym, _] : m_instruments) {
+ symbols.push_back(sym);
+ }
+ return symbols;
+}
+
+size_t InstrumentManager::getInstrumentCount() const {
+ std::lock_guard lock(m_mutex);
+ return m_instruments.size();
+}
+
+bool InstrumentManager::hasInstrument(const std::string& symbol) const {
+ std::lock_guard lock(m_mutex);
+ return m_instruments.count(symbol) > 0;
+}
+
+std::string InstrumentManager::getAggregateStatistics() const {
+ // Take a snapshot of contexts under lock, then format without lock
+ std::vector> contexts;
+ {
+ std::lock_guard lock(m_mutex);
+ contexts.reserve(m_instruments.size());
+ for (const auto& [symbol, ctx] : m_instruments) {
+ contexts.push_back(ctx);
+ }
+ }
+
+ std::ostringstream oss;
+ double totalPnL = 0.0;
+ double totalPosition = 0.0;
+ size_t totalOrders = 0;
+
+ for (const auto& ctx : contexts) {
+ oss << "--- " << ctx->symbol << " ---\n";
+
+ if (ctx->orderBook) {
+ oss << " Best bid: " << ctx->orderBook->getBestBidPrice() << "\n";
+ oss << " Best ask: " << ctx->orderBook->getBestAskPrice() << "\n";
+ oss << " Mid price: " << ctx->orderBook->getMidPrice() << "\n";
+ oss << " Spread: " << ctx->orderBook->getSpread() << "\n";
+ oss << " Order count: " << ctx->orderBook->getOrderCount() << "\n";
+ totalOrders += ctx->orderBook->getOrderCount();
+ }
+
+ if (ctx->strategy) {
+ oss << ctx->strategy->getStatistics() << "\n";
+ totalPnL += ctx->strategy->getPnL();
+ totalPosition += ctx->strategy->getPosition();
+ }
+ }
+
+ oss << "--- AGGREGATE ---\n";
+ oss << " Instruments: " << contexts.size() << "\n";
+ oss << " Total PnL: " << totalPnL << "\n";
+ oss << " Total Position: " << totalPosition << "\n";
+ oss << " Total Orders: " << totalOrders << "\n";
+
+ return oss.str();
+}
+
+void InstrumentManager::createCheckpoints() {
+ std::lock_guard lock(m_mutex);
+ for (auto& [symbol, ctx] : m_instruments) {
+ if (ctx->orderBook) {
+ ctx->orderBook->createCheckpoint();
+ }
+ }
+}
+
+} // namespace instrument
+} // namespace pinnacle
diff --git a/core/instrument/InstrumentManager.h b/core/instrument/InstrumentManager.h
new file mode 100644
index 0000000..1d60132
--- /dev/null
+++ b/core/instrument/InstrumentManager.h
@@ -0,0 +1,157 @@
+#pragma once
+
+#include "../../exchange/simulator/ExchangeSimulator.h"
+#include "../../strategies/basic/BasicMarketMaker.h"
+#include "../../strategies/config/StrategyConfig.h"
+#include "../orderbook/LockFreeOrderBook.h"
+#include "../orderbook/OrderBook.h"
+
+#include
+#include
+#include
+#include
+#include
+
+namespace pinnacle {
+namespace instrument {
+
+/**
+ * @struct InstrumentConfig
+ * @brief Configuration for a single instrument
+ */
+struct InstrumentConfig {
+ std::string symbol{"BTC-USD"};
+ bool useLockFree{true};
+ bool enableML{false};
+ double baseSpreadBps{10.0};
+ double orderQuantity{0.01};
+ double maxPosition{10.0};
+};
+
+/**
+ * @struct InstrumentContext
+ * @brief Holds all per-instrument components
+ */
+struct InstrumentContext {
+ std::string symbol;
+ std::shared_ptr orderBook;
+ std::shared_ptr strategy;
+ std::shared_ptr simulator; // null in live mode
+ InstrumentConfig config;
+ bool running{false};
+};
+
+/**
+ * @class InstrumentManager
+ * @brief Central class to manage multiple {symbol, orderbook, strategy,
+ * simulator} tuples
+ *
+ * Manages the lifecycle of per-instrument components. Supports adding/removing
+ * instruments at runtime. Keeps single-symbol mode working for backward
+ * compatibility.
+ *
+ * Contexts are stored as shared_ptr so that getContext() returns a safe handle
+ * that remains valid even if the map is modified concurrently.
+ */
+class InstrumentManager {
+public:
+ InstrumentManager() = default;
+ ~InstrumentManager();
+
+ InstrumentManager(const InstrumentManager&) = delete;
+ InstrumentManager& operator=(const InstrumentManager&) = delete;
+
+ /**
+ * @brief Add an instrument with its own orderbook, strategy, and simulator
+ * @param config Instrument configuration
+ * @param mode Operating mode ("simulation", "live", "backtest")
+ * @return true if the instrument was added successfully
+ */
+ bool addInstrument(const InstrumentConfig& config, const std::string& mode);
+
+ /**
+ * @brief Remove an instrument by symbol (stops it first)
+ * @param symbol Trading symbol to remove
+ * @return true if the instrument was removed successfully
+ */
+ bool removeInstrument(const std::string& symbol);
+
+ /**
+ * @brief Start all registered instruments
+ * @return true if all instruments started successfully
+ */
+ bool startAll();
+
+ /**
+ * @brief Stop all registered instruments
+ * @return true if all instruments stopped successfully
+ */
+ bool stopAll();
+
+ /**
+ * @brief Start a specific instrument
+ * @param symbol Trading symbol
+ * @return true if started successfully
+ */
+ bool startInstrument(const std::string& symbol);
+
+ /**
+ * @brief Stop a specific instrument
+ * @param symbol Trading symbol
+ * @return true if stopped successfully
+ */
+ bool stopInstrument(const std::string& symbol);
+
+ /**
+ * @brief Get the context for a specific instrument
+ * @param symbol Trading symbol
+ * @return shared_ptr to InstrumentContext, or nullptr if not found
+ */
+ std::shared_ptr getContext(const std::string& symbol);
+
+ /**
+ * @brief Get the context for a specific instrument (const version)
+ * @param symbol Trading symbol
+ * @return shared_ptr to const InstrumentContext, or nullptr if not found
+ */
+ std::shared_ptr
+ getContext(const std::string& symbol) const;
+
+ /**
+ * @brief Get all registered symbols
+ * @return Vector of symbol strings
+ */
+ std::vector getSymbols() const;
+
+ /**
+ * @brief Get number of registered instruments
+ * @return Count of instruments
+ */
+ size_t getInstrumentCount() const;
+
+ /**
+ * @brief Check if a symbol is registered
+ * @param symbol Trading symbol
+ * @return true if the symbol is registered
+ */
+ bool hasInstrument(const std::string& symbol) const;
+
+ /**
+ * @brief Get aggregate statistics across all instruments
+ * @return Formatted statistics string
+ */
+ std::string getAggregateStatistics() const;
+
+ /**
+ * @brief Create checkpoints for all order books
+ */
+ void createCheckpoints();
+
+private:
+ mutable std::mutex m_mutex;
+ std::unordered_map>
+ m_instruments;
+};
+
+} // namespace instrument
+} // namespace pinnacle
diff --git a/core/instrument/ResourceAllocator.cpp b/core/instrument/ResourceAllocator.cpp
new file mode 100644
index 0000000..0f6ef84
--- /dev/null
+++ b/core/instrument/ResourceAllocator.cpp
@@ -0,0 +1,84 @@
+#include "ResourceAllocator.h"
+
+#include
+#include
+
+namespace pinnacle {
+namespace instrument {
+
+std::unordered_map
+ResourceAllocator::allocate(const std::vector& symbols) const {
+ std::unordered_map assignments;
+
+ int numCores = getAvailableCores();
+ int numInstruments = static_cast(symbols.size());
+
+ if (numInstruments == 0) {
+ return assignments;
+ }
+
+ if (numCores <= 0) {
+ spdlog::error("Invalid core count {}; defaulting to 1", numCores);
+ numCores = 1;
+ }
+
+ // Reserve core 0 for OS / main thread
+ int availableCores = std::max(1, numCores - 1);
+
+ // Each instrument ideally needs 2 cores (strategy + simulator)
+ int coresPerInstrument = std::max(1, availableCores / numInstruments);
+
+ int nextCore = 1; // Start from core 1
+
+ for (int i = 0; i < numInstruments; ++i) {
+ CoreAssignment assignment;
+ assignment.symbol = symbols[i];
+
+ // Assign strategy core (stay in range [1, numCores))
+ assignment.strategyCore = 1 + ((nextCore - 1) % availableCores);
+ nextCore++;
+
+ // Assign simulator core if we have enough cores
+ if (coresPerInstrument >= 2) {
+ assignment.simulatorCore = 1 + ((nextCore - 1) % availableCores);
+ nextCore++;
+ } else {
+ // Share core with strategy
+ assignment.simulatorCore = assignment.strategyCore;
+ }
+
+ // Higher priority for instruments listed first
+ assignment.priority = numInstruments - i;
+
+ assignments[symbols[i]] = assignment;
+
+ spdlog::info("[{}] Core assignment: strategy={} simulator={} priority={}",
+ symbols[i], assignment.strategyCore, assignment.simulatorCore,
+ assignment.priority);
+ }
+
+ return assignments;
+}
+
+int ResourceAllocator::getAvailableCores() const {
+ return utils::ThreadAffinity::getNumCores();
+}
+
+bool ResourceAllocator::applyAssignment(const CoreAssignment& assignment,
+ bool isStrategy) {
+ int core = isStrategy ? assignment.strategyCore : assignment.simulatorCore;
+ if (core < 0) {
+ return false;
+ }
+
+ bool result = utils::ThreadAffinity::pinToCore(core);
+
+ std::string threadType = isStrategy ? "strategy" : "simulator";
+ std::string threadName = assignment.symbol + "_" + threadType;
+ utils::ThreadAffinity::setThreadName(threadName);
+
+ return result;
+}
+
+} // namespace instrument
+} // namespace pinnacle
diff --git a/core/instrument/ResourceAllocator.h b/core/instrument/ResourceAllocator.h
new file mode 100644
index 0000000..484f480
--- /dev/null
+++ b/core/instrument/ResourceAllocator.h
@@ -0,0 +1,60 @@
+#pragma once
+
+#include "../utils/ThreadAffinity.h"
+
+#include
+#include
+#include
+
+namespace pinnacle {
+namespace instrument {
+
+/**
+ * @struct CoreAssignment
+ * @brief CPU core assignment for an instrument
+ */
+struct CoreAssignment {
+ std::string symbol;
+ int strategyCore{-1}; // Core for the strategy thread
+ int simulatorCore{-1}; // Core for the simulator thread
+ int priority{0}; // Thread priority hint (0 = normal)
+};
+
+/**
+ * @class ResourceAllocator
+ * @brief Assigns CPU cores and priorities to instruments based on count and
+ * available hardware
+ *
+ * Used by InstrumentManager on startup to distribute instruments across
+ * available cores for optimal performance.
+ */
+class ResourceAllocator {
+public:
+ ResourceAllocator() = default;
+
+ /**
+ * @brief Allocate cores for a set of instruments
+ * @param symbols List of instrument symbols
+ * @return Map of symbol -> CoreAssignment
+ */
+ std::unordered_map
+ allocate(const std::vector& symbols) const;
+
+ /**
+ * @brief Get available core count
+ * @return Number of hardware threads available
+ */
+ int getAvailableCores() const;
+
+ /**
+ * @brief Apply a core assignment to the calling thread
+ * @param assignment The assignment to apply
+ * @param isStrategy true for strategy thread, false for simulator
+ * @return true if affinity was set successfully
+ */
+ static bool applyAssignment(const CoreAssignment& assignment,
+ bool isStrategy);
+};
+
+} // namespace instrument
+} // namespace pinnacle
diff --git a/core/risk/RiskConfig.h b/core/risk/RiskConfig.h
index 702404a..9fc6081 100644
--- a/core/risk/RiskConfig.h
+++ b/core/risk/RiskConfig.h
@@ -36,6 +36,19 @@ struct RiskLimits {
uint32_t maxOrdersPerSecond{100};
};
+/**
+ * @struct PerSymbolLimits
+ * @brief Per-symbol risk limits (overrides global if set)
+ */
+struct PerSymbolLimits {
+ std::string symbol;
+ double maxPositionSize{0.0}; // 0 = use global
+ double maxDailyVolume{0.0}; // 0 = use global
+ double dailyLossLimit{0.0}; // 0 = use global
+ double maxOrderSize{0.0}; // 0 = use global
+ double maxNotionalExposure{0.0}; // 0 = use global
+};
+
/**
* @struct CircuitBreakerConfig
* @brief Configuration for the circuit breaker
@@ -96,6 +109,7 @@ struct RiskConfig {
CircuitBreakerConfig circuitBreaker;
VaRConfig var;
AlertConfig alerts;
+ std::vector perSymbolLimits;
/**
* @brief Load risk configuration from JSON
@@ -186,6 +200,22 @@ struct RiskConfig {
config.alerts.criticalThresholdPct = al.value(
"critical_threshold_pct", config.alerts.criticalThresholdPct);
}
+
+ if (rm.contains("per_symbol_limits") &&
+ rm["per_symbol_limits"].is_array()) {
+ for (const auto& psl : rm["per_symbol_limits"]) {
+ PerSymbolLimits limits;
+ limits.symbol = psl.value("symbol", std::string{});
+ limits.maxPositionSize = psl.value("max_position_size", 0.0);
+ limits.maxDailyVolume = psl.value("max_daily_volume", 0.0);
+ limits.dailyLossLimit = psl.value("daily_loss_limit", 0.0);
+ limits.maxOrderSize = psl.value("max_order_size", 0.0);
+ limits.maxNotionalExposure = psl.value("max_notional_exposure", 0.0);
+ if (!limits.symbol.empty()) {
+ config.perSymbolLimits.push_back(limits);
+ }
+ }
+ }
}
return config;
@@ -195,7 +225,7 @@ struct RiskConfig {
* @brief Serialize to JSON
*/
nlohmann::json toJson() const {
- return {
+ nlohmann::json result = {
{"risk_management",
{{"limits",
{{"max_position_size", limits.maxPositionSize},
@@ -233,6 +263,22 @@ struct RiskConfig {
{"max_history", alerts.maxAlertHistory},
{"warning_threshold_pct", alerts.warningThresholdPct},
{"critical_threshold_pct", alerts.criticalThresholdPct}}}}}};
+
+ // Add per-symbol limits
+ nlohmann::json pslArray = nlohmann::json::array();
+ for (const auto& psl : perSymbolLimits) {
+ pslArray.push_back({{"symbol", psl.symbol},
+ {"max_position_size", psl.maxPositionSize},
+ {"max_daily_volume", psl.maxDailyVolume},
+ {"daily_loss_limit", psl.dailyLossLimit},
+ {"max_order_size", psl.maxOrderSize},
+ {"max_notional_exposure", psl.maxNotionalExposure}});
+ }
+ if (!pslArray.empty()) {
+ result["risk_management"]["per_symbol_limits"] = pslArray;
+ }
+
+ return result;
}
};
diff --git a/core/risk/RiskManager.cpp b/core/risk/RiskManager.cpp
index 659db91..06dd0d4 100644
--- a/core/risk/RiskManager.cpp
+++ b/core/risk/RiskManager.cpp
@@ -2,6 +2,7 @@
#include "../utils/AuditLogger.h"
#include
+#include
#include
namespace pinnacle {
@@ -54,6 +55,13 @@ void RiskManager::initialize(const RiskLimits& limits) {
m_ordersThisSecond.store(0, std::memory_order_relaxed);
m_currentSecond.store(0, std::memory_order_relaxed);
+ // Clear per-symbol state so tests and re-initialization start fresh
+ {
+ std::unique_lock lock(m_symbolMutex);
+ m_symbolStates.clear();
+ m_symbolLimits.clear();
+ }
+
spdlog::info("RiskManager initialized - maxPos={} maxOrderSize={} "
"dailyLossLimit={} maxDrawdown={}%",
limits.maxPositionSize, limits.maxOrderSize,
@@ -114,11 +122,30 @@ RiskCheckResult RiskManager::checkOrder(OrderSide side, double price,
return RiskCheckResult::REJECTED_ORDER_SIZE_LIMIT;
}
- // 4. Position limit check
+ // 4. Position limit check (per-symbol if registered, else global)
double currentPos = m_position.load(std::memory_order_relaxed);
double projectedPos = (side == OrderSide::BUY) ? (currentPos + quantity)
: (currentPos - quantity);
double maxPos = m_limits.maxPositionSize;
+
+ // Check per-symbol position limit if available
+ {
+ std::shared_lock lock(m_symbolMutex);
+ auto limIt = m_symbolLimits.find(symbol);
+ auto stateIt = m_symbolStates.find(symbol);
+ if (limIt != m_symbolLimits.end() && limIt->second.maxPositionSize > 0.0 &&
+ stateIt != m_symbolStates.end()) {
+ double symPos = stateIt->second->position.load(std::memory_order_relaxed);
+ double symProjected =
+ (side == OrderSide::BUY) ? (symPos + quantity) : (symPos - quantity);
+ if (std::abs(symProjected) > limIt->second.maxPositionSize) {
+ AUDIT_ORDER_ACTIVITY("system", "", "rejected_position_limit", symbol,
+ false);
+ return RiskCheckResult::REJECTED_POSITION_LIMIT;
+ }
+ }
+ }
+
if (std::abs(projectedPos) > maxPos) {
AUDIT_ORDER_ACTIVITY("system", "", "rejected_position_limit", symbol,
false);
@@ -213,6 +240,36 @@ void RiskManager::onFill(OrderSide side, double price, double quantity,
m_netExposure.store(net, std::memory_order_release);
}
+ // Update per-symbol state if registered (grow-only map — pointer is stable)
+ {
+ std::shared_lock lock(m_symbolMutex);
+ auto symIt = m_symbolStates.find(symbol);
+ if (symIt != m_symbolStates.end()) {
+ auto& ss = *symIt->second;
+
+ double sPrev = ss.position.load(std::memory_order_relaxed);
+ double sNew;
+ do {
+ sNew = sPrev + delta;
+ } while (!ss.position.compare_exchange_weak(
+ sPrev, sNew, std::memory_order_release, std::memory_order_relaxed));
+
+ double vPrev = ss.dailyVolume.load(std::memory_order_relaxed);
+ double vNew;
+ do {
+ vNew = vPrev + quantity;
+ } while (!ss.dailyVolume.compare_exchange_weak(
+ vPrev, vNew, std::memory_order_release, std::memory_order_relaxed));
+
+ double ePrev = ss.exposure.load(std::memory_order_relaxed);
+ double eNew;
+ do {
+ eNew = ePrev + notional;
+ } while (!ss.exposure.compare_exchange_weak(
+ ePrev, eNew, std::memory_order_release, std::memory_order_relaxed));
+ }
+ }
+
spdlog::debug("Fill: {} {} {} @ {} | pos={} vol={} notional={}",
(side == OrderSide::BUY) ? "BUY" : "SELL", quantity, symbol,
price, newPos, newVol, notional);
@@ -567,6 +624,53 @@ void RiskManager::checkDailyReset() {
}
}
+// ---------------------------------------------------------------------------
+// Per-symbol tracking
+// ---------------------------------------------------------------------------
+void RiskManager::registerSymbol(const std::string& symbol) {
+ std::unique_lock lock(m_symbolMutex);
+ if (m_symbolStates.count(symbol)) {
+ return; // already registered
+ }
+ m_symbolStates[symbol] = std::make_shared(symbol);
+ spdlog::info("Registered per-symbol risk tracking for {}", symbol);
+}
+
+SymbolRiskState* RiskManager::getSymbolState(const std::string& symbol) {
+ std::shared_lock lock(m_symbolMutex);
+ auto it = m_symbolStates.find(symbol);
+ if (it != m_symbolStates.end()) {
+ return it->second.get();
+ }
+ return nullptr;
+}
+
+const SymbolRiskState*
+RiskManager::getSymbolState(const std::string& symbol) const {
+ std::shared_lock lock(m_symbolMutex);
+ auto it = m_symbolStates.find(symbol);
+ if (it != m_symbolStates.end()) {
+ return it->second.get();
+ }
+ return nullptr;
+}
+
+void RiskManager::setSymbolLimits(const PerSymbolLimits& limits) {
+ std::unique_lock lock(m_symbolMutex);
+ m_symbolLimits[limits.symbol] = limits;
+ spdlog::info("Set per-symbol limits for {}", limits.symbol);
+}
+
+const PerSymbolLimits*
+RiskManager::getSymbolLimits(const std::string& symbol) const {
+ std::shared_lock lock(m_symbolMutex);
+ auto it = m_symbolLimits.find(symbol);
+ if (it != m_symbolLimits.end()) {
+ return &it->second;
+ }
+ return nullptr;
+}
+
// ---------------------------------------------------------------------------
// Utility
// ---------------------------------------------------------------------------
diff --git a/core/risk/RiskManager.h b/core/risk/RiskManager.h
index e2663a3..69bbd75 100644
--- a/core/risk/RiskManager.h
+++ b/core/risk/RiskManager.h
@@ -7,9 +7,12 @@
#include
#include
#include
+#include
#include
+#include
#include
#include
+#include
namespace pinnacle {
namespace risk {
@@ -52,6 +55,28 @@ struct RiskState {
uint64_t currentSecond{0};
};
+/**
+ * @struct SymbolRiskState
+ * @brief Per-symbol atomic tracking for position, PnL, and volume
+ *
+ * Uses atomics so the hot path can read without locks.
+ * The map holding these is grow-only so readers are safe.
+ */
+struct SymbolRiskState {
+ std::string symbol;
+ std::atomic position{0.0};
+ std::atomic dailyPnL{0.0};
+ std::atomic dailyVolume{0.0};
+ std::atomic exposure{0.0};
+
+ SymbolRiskState() = default;
+ explicit SymbolRiskState(const std::string& sym) : symbol(sym) {}
+
+ // Non-copyable due to atomics — use shared_ptr
+ SymbolRiskState(const SymbolRiskState&) = delete;
+ SymbolRiskState& operator=(const SymbolRiskState&) = delete;
+};
+
/**
* @class RiskManager
* @brief Singleton risk manager providing pre-trade checks and position
@@ -60,6 +85,9 @@ struct RiskState {
* The hot path (checkOrder) is fully lock-free, relying only on atomic loads.
* State mutations (onFill, onPnLUpdate) use atomic stores and acquire the mutex
* only when complex multi-field consistency is required.
+ *
+ * Per-symbol tracking is grow-only: once a symbol is registered, its
+ * SymbolRiskState pointer is stable so reads need no lock.
*/
class RiskManager {
public:
@@ -143,6 +171,35 @@ class RiskManager {
double getPositionUtilization() const;
double getDailyLossUtilization() const;
+ // --- Per-symbol tracking ---
+
+ /**
+ * @brief Register a symbol for per-symbol risk tracking
+ * @param symbol Trading symbol
+ */
+ void registerSymbol(const std::string& symbol);
+
+ /**
+ * @brief Get per-symbol risk state (nullptr if not registered)
+ */
+ SymbolRiskState* getSymbolState(const std::string& symbol);
+
+ /**
+ * @brief Get per-symbol risk state (const, nullptr if not registered)
+ */
+ const SymbolRiskState* getSymbolState(const std::string& symbol) const;
+
+ /**
+ * @brief Set per-symbol limits (overrides global for that symbol)
+ * @param limits Per-symbol limit configuration
+ */
+ void setSymbolLimits(const PerSymbolLimits& limits);
+
+ /**
+ * @brief Get per-symbol limits (returns nullptr if not set)
+ */
+ const PerSymbolLimits* getSymbolLimits(const std::string& symbol) const;
+
/**
* @brief Update risk limits at runtime
* @param limits New risk limits
@@ -205,6 +262,13 @@ class RiskManager {
std::string m_haltReason;
uint64_t m_dailyResetTime{0};
+ // Per-symbol state (grow-only map — reads safe without lock once inserted)
+ std::unordered_map>
+ m_symbolStates;
+ std::unordered_map m_symbolLimits;
+ mutable std::shared_mutex
+ m_symbolMutex; // shared for reads, exclusive for writes
+
// Hedge state
std::mutex m_hedgeMutex;
HedgeCallback m_hedgeCallback;
diff --git a/core/utils/LockFreeOrderBook.cpp b/core/utils/LockFreeOrderBook.cpp
index 8d75022..2985f2b 100644
--- a/core/utils/LockFreeOrderBook.cpp
+++ b/core/utils/LockFreeOrderBook.cpp
@@ -2,6 +2,9 @@
#include "../utils/TimeUtils.h"
#include
#include
+#if defined(_MSC_VER)
+#include
+#endif
namespace pinnacle {
namespace utils {
@@ -27,92 +30,94 @@ LockFreePriceLevel::~LockFreePriceLevel() {
}
}
-void LockFreePriceLevel::updateTotalQuantity() {
- double total = 0.0;
- OrderNode* current = m_head.load(std::memory_order_acquire)
- ->next.load(std::memory_order_acquire);
-
- while (current) {
- if (current->order) {
- total += current->order->getRemainingQuantity();
- }
- current = current->next.load(std::memory_order_acquire);
- }
-
- m_totalQuantity.store(total, std::memory_order_release);
-}
-
bool LockFreePriceLevel::addOrder(std::shared_ptr order) {
if (!order) {
return false;
}
- // Use exclusive lock to prevent readers during structure modification
+ double qty = order->getRemainingQuantity();
+
+ // Use exclusive lock — direct append, no CAS loop needed
std::unique_lock lock(m_nodeAccessMutex);
- // Create a new node
OrderNode* newNode = new OrderNode(std::move(order));
- // Add to the end of the list using a Michael-Scott queue approach
- while (true) {
- OrderNode* tail = m_tail.load(std::memory_order_acquire);
- OrderNode* next = tail->next.load(std::memory_order_acquire);
-
- // Check if tail is still the last node
- if (tail == m_tail.load(std::memory_order_acquire)) {
- if (next == nullptr) {
- // Tail is pointing to the last node, try to append the new node
- if (tail->next.compare_exchange_weak(next, newNode,
- std::memory_order_release,
- std::memory_order_relaxed)) {
- // Successfully appended the new node, try to update the tail
- m_tail.compare_exchange_strong(tail, newNode,
- std::memory_order_release,
- std::memory_order_relaxed);
- m_orderCount.fetch_add(1, std::memory_order_release);
- updateTotalQuantity();
- return true;
- }
- } else {
- // Tail is not pointing to the last node, try to help advance it
- m_tail.compare_exchange_strong(tail, next, std::memory_order_release,
- std::memory_order_relaxed);
- }
- }
- }
+ // Direct append: we hold exclusive lock so no concurrent modification
+ OrderNode* tail = m_tail.load(std::memory_order_relaxed);
+ tail->next.store(newNode, std::memory_order_relaxed);
+ m_tail.store(newNode, std::memory_order_relaxed);
+
+ m_orderCount.fetch_add(1, std::memory_order_release);
+
+ // O(1) quantity update instead of walking the list
+ double prev = m_totalQuantity.load(std::memory_order_relaxed);
+ m_totalQuantity.store(prev + qty, std::memory_order_release);
+
+ return true;
}
bool LockFreePriceLevel::removeOrder(const std::string& orderId) {
- // Use exclusive lock to prevent readers from accessing during modification
+ // Use exclusive lock — physically unlink the node
std::unique_lock lock(m_nodeAccessMutex);
- OrderNode* prev = m_head.load(std::memory_order_acquire);
- OrderNode* curr = prev->next.load(std::memory_order_acquire);
+ OrderNode* prev = m_head.load(std::memory_order_relaxed);
+ OrderNode* curr = prev->next.load(std::memory_order_relaxed);
- bool found = false;
while (curr != nullptr) {
- // Find the node to remove
if (curr->order && curr->order->getOrderId() == orderId) {
- found = true;
+ double qty = curr->order->getRemainingQuantity();
+
+ // Physical unlink: prev->next = curr->next
+ OrderNode* next = curr->next.load(std::memory_order_relaxed);
+ prev->next.store(next, std::memory_order_relaxed);
+
+ // Update tail if we removed the last node
+ if (m_tail.load(std::memory_order_relaxed) == curr) {
+ m_tail.store(prev, std::memory_order_relaxed);
+ }
+
+ delete curr;
- // Logical deletion: null out the order pointer
- // This prevents use-after-free while allowing safe traversal
- // The Order object will be automatically cleaned up by shared_ptr
- curr->order = nullptr;
m_orderCount.fetch_sub(1, std::memory_order_release);
- break;
+
+ // O(1) quantity update
+ double prevTotal = m_totalQuantity.load(std::memory_order_relaxed);
+ m_totalQuantity.store(std::max(0.0, prevTotal - qty),
+ std::memory_order_release);
+
+ return true;
}
prev = curr;
- curr = curr->next.load(std::memory_order_acquire);
+ curr = curr->next.load(std::memory_order_relaxed);
}
- // Update total quantity if an order was removed
- if (found) {
- updateTotalQuantity();
+ return false;
+}
+
+void LockFreePriceLevel::subtractQuantity(double qty) {
+ std::unique_lock lock(m_nodeAccessMutex);
+ double prev = m_totalQuantity.load(std::memory_order_relaxed);
+ m_totalQuantity.store(std::max(0.0, prev - qty), std::memory_order_release);
+}
+
+void LockFreePriceLevel::updateTotalQuantity() {
+ // Kept for compatibility but now rarely needed — add/remove do O(1) updates.
+ // This full-scan version can be used if quantities change externally.
+ std::shared_lock lock(m_nodeAccessMutex);
+
+ double total = 0.0;
+ OrderNode* current = m_head.load(std::memory_order_acquire)
+ ->next.load(std::memory_order_acquire);
+
+ while (current) {
+ if (current->order) {
+ total += current->order->getRemainingQuantity();
+ }
+ current = current->next.load(std::memory_order_acquire);
}
- return found;
+ m_totalQuantity.store(total, std::memory_order_release);
}
std::vector> LockFreePriceLevel::getOrders() const {
@@ -140,6 +145,8 @@ std::vector> LockFreePriceLevel::getOrders() const {
std::shared_ptr
LockFreePriceLevel::findOrder(const std::string& orderId) const {
+ std::shared_lock lock(m_nodeAccessMutex);
+
OrderNode* current = m_head.load(std::memory_order_acquire)
->next.load(std::memory_order_acquire);
@@ -155,6 +162,8 @@ LockFreePriceLevel::findOrder(const std::string& orderId) const {
void LockFreePriceLevel::forEachOrder(
const std::function)>& func) const {
+ std::shared_lock lock(m_nodeAccessMutex);
+
OrderNode* current = m_head.load(std::memory_order_acquire)
->next.load(std::memory_order_acquire);
@@ -170,8 +179,21 @@ void LockFreePriceLevel::forEachOrder(
LockFreeOrderMap::ShardGuard::ShardGuard(std::atomic_flag& lock)
: m_lock(lock) {
+ // Spin with pause hint to reduce contention
while (m_lock.test_and_set(std::memory_order_acquire)) {
- // Spin until we acquire the lock
+#if defined(_MSC_VER)
+#if defined(_M_X64) || defined(_M_IX86)
+ _mm_pause();
+#elif defined(_M_ARM64)
+ __yield();
+#endif
+#elif defined(__GNUC__) || defined(__clang__)
+#if defined(__x86_64__) || defined(__i386__)
+ __builtin_ia32_pause();
+#elif defined(__aarch64__)
+ __asm__ __volatile__("yield" ::: "memory");
+#endif
+#endif
}
}
@@ -258,12 +280,7 @@ bool LockFreeOrderBook::addOrder(std::shared_ptr order) {
return false;
}
- // Check if the order already exists
- if (m_orders.contains(order->getOrderId())) {
- return false;
- }
-
- // Add to the order map
+ // Single-step insert (returns false if already exists — no separate contains)
if (!m_orders.insert(order->getOrderId(), order)) {
return false;
}
@@ -416,6 +433,11 @@ bool LockFreeOrderBook::executeOrder(const std::string& orderId,
if (order->isBuy()) {
std::shared_ptr level = m_bids.findLevel(price);
if (level) {
+ // Subtract the filled quantity from the level total.
+ // This must happen before removeOrder, because removeOrder subtracts
+ // getRemainingQuantity() which is 0 for fully filled orders.
+ level->subtractQuantity(quantity);
+
// If order is fully filled, remove it
if (order->getStatus() == OrderStatus::FILLED) {
level->removeOrder(orderId);
@@ -432,6 +454,9 @@ bool LockFreeOrderBook::executeOrder(const std::string& orderId,
} else {
std::shared_ptr level = m_asks.findLevel(price);
if (level) {
+ // Subtract the filled quantity from the level total
+ level->subtractQuantity(quantity);
+
// If order is fully filled, remove it
if (order->getStatus() == OrderStatus::FILLED) {
level->removeOrder(orderId);
@@ -553,8 +578,6 @@ double LockFreeOrderBook::executeMarketOrder(
auto orders = level->getOrders();
std::vector filledOrders;
- double levelExecutedQty = 0.0;
- (void)levelExecutedQty; // Used for potential future logging
for (auto& order : orders) {
if (remainingQty <= 0)
@@ -567,7 +590,9 @@ double LockFreeOrderBook::executeMarketOrder(
fills.emplace_back(order->getOrderId(), fillQty);
remainingQty -= fillQty;
executedQuantity += fillQty;
- levelExecutedQty += fillQty;
+
+ // Subtract fill from level total before potential removal
+ level->subtractQuantity(fillQty);
if (order->getStatus() == OrderStatus::FILLED) {
filledOrders.push_back(order->getOrderId());
@@ -597,8 +622,6 @@ double LockFreeOrderBook::executeMarketOrder(
auto orders = level->getOrders();
std::vector filledOrders;
- double levelExecutedQty = 0.0;
- (void)levelExecutedQty; // Used for potential future logging
for (auto& order : orders) {
if (remainingQty <= 0)
@@ -611,7 +634,9 @@ double LockFreeOrderBook::executeMarketOrder(
fills.emplace_back(order->getOrderId(), fillQty);
remainingQty -= fillQty;
executedQuantity += fillQty;
- levelExecutedQty += fillQty;
+
+ // Subtract fill from level total before potential removal
+ level->subtractQuantity(fillQty);
if (order->getStatus() == OrderStatus::FILLED) {
filledOrders.push_back(order->getOrderId());
diff --git a/core/utils/LockFreeOrderBook.h b/core/utils/LockFreeOrderBook.h
index 70dcd64..e624662 100644
--- a/core/utils/LockFreeOrderBook.h
+++ b/core/utils/LockFreeOrderBook.h
@@ -93,6 +93,9 @@ class LockFreePriceLevel {
// Find order by ID
std::shared_ptr findOrder(const std::string& orderId) const;
+ // Subtract a quantity from the level total (used after fills)
+ void subtractQuantity(double qty);
+
// Apply a function to each order
void
forEachOrder(const std::function)>& func) const;
diff --git a/core/utils/ObjectPool.h b/core/utils/ObjectPool.h
new file mode 100644
index 0000000..2722c13
--- /dev/null
+++ b/core/utils/ObjectPool.h
@@ -0,0 +1,172 @@
+#pragma once
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace pinnacle {
+namespace utils {
+
+/**
+ * @class ObjectPool
+ * @brief Thread-safe object pool template with custom deleter recycling
+ *
+ * Pre-allocates objects and returns them via shared_ptr with a custom deleter
+ * that recycles objects back to the pool instead of destroying them.
+ *
+ * Uses a SharedState pattern so that deleters safely handle the case where
+ * the pool is destroyed before all objects are returned.
+ *
+ * @tparam T The type of object to pool
+ */
+template class ObjectPool {
+public:
+ /**
+ * @brief Construct pool with initial capacity
+ * @param initialSize Number of objects to pre-allocate
+ */
+ explicit ObjectPool(size_t initialSize = 64)
+ : m_state(std::make_shared()) {
+ std::lock_guard lock(m_state->mutex);
+ m_state->pool.reserve(initialSize);
+ for (size_t i = 0; i < initialSize; ++i) {
+ m_state->pool.push_back(std::make_unique());
+ }
+ m_totalAllocated.store(initialSize, std::memory_order_relaxed);
+ }
+
+ /**
+ * @brief Construct pool with a factory function
+ * @param initialSize Number of objects to pre-allocate
+ * @param factory Function to create new objects
+ */
+ ObjectPool(size_t initialSize, std::function()> factory)
+ : m_state(std::make_shared()),
+ m_factory(std::move(factory)) {
+ std::lock_guard lock(m_state->mutex);
+ m_state->pool.reserve(initialSize);
+ for (size_t i = 0; i < initialSize; ++i) {
+ auto obj = m_factory ? m_factory() : std::make_unique();
+ if (!obj) {
+ throw std::runtime_error(
+ "ObjectPool factory returned nullptr during pre-allocation");
+ }
+ m_state->pool.push_back(std::move(obj));
+ }
+ m_totalAllocated.store(initialSize, std::memory_order_relaxed);
+ }
+
+ ~ObjectPool() { m_state->alive.store(false, std::memory_order_release); }
+
+ ObjectPool(const ObjectPool&) = delete;
+ ObjectPool& operator=(const ObjectPool&) = delete;
+
+ /**
+ * @brief Acquire an object from the pool
+ *
+ * Returns a shared_ptr with a custom deleter that recycles the object back
+ * to the pool when the last reference is released.
+ *
+ * @return shared_ptr to a pooled object
+ */
+ std::shared_ptr acquire() {
+ T* raw = nullptr;
+
+ {
+ std::lock_guard lock(m_state->mutex);
+ if (!m_state->pool.empty()) {
+ raw = m_state->pool.back().release();
+ m_state->pool.pop_back();
+ }
+ }
+
+ if (!raw) {
+ // Pool exhausted — allocate a new object
+ if (m_factory) {
+ auto obj = m_factory();
+ if (!obj) {
+ throw std::runtime_error(
+ "ObjectPool factory returned nullptr during acquire()");
+ }
+ raw = obj.release();
+ } else {
+ raw = new T();
+ }
+ m_totalAllocated.fetch_add(1, std::memory_order_relaxed);
+ }
+
+ m_acquireCount.fetch_add(1, std::memory_order_relaxed);
+
+ // Capture shared_ptr to SharedState (not `this`) so the deleter can
+ // safely access the mutex and pool even if the ObjectPool is destroyed.
+ auto state = m_state;
+ return std::shared_ptr(raw, [state](T* obj) noexcept {
+ if (state->alive.load(std::memory_order_acquire)) {
+ try {
+ std::lock_guard lock(state->mutex);
+ state->pool.push_back(std::unique_ptr(obj));
+ state->recycleCount.fetch_add(1, std::memory_order_relaxed);
+ } catch (...) {
+ // push_back failed (e.g., allocation) — delete directly
+ delete obj;
+ }
+ } else {
+ delete obj;
+ }
+ });
+ }
+
+ /**
+ * @brief Get current number of available objects in the pool
+ */
+ size_t available() const {
+ std::lock_guard lock(m_state->mutex);
+ return m_state->pool.size();
+ }
+
+ /**
+ * @brief Get total number of objects ever allocated by this pool
+ */
+ size_t totalAllocated() const {
+ return m_totalAllocated.load(std::memory_order_relaxed);
+ }
+
+ /**
+ * @brief Get total number of acquire() calls
+ */
+ size_t acquireCount() const {
+ return m_acquireCount.load(std::memory_order_relaxed);
+ }
+
+ /**
+ * @brief Get total number of recycle operations
+ */
+ size_t recycleCount() const {
+ return m_state->recycleCount.load(std::memory_order_relaxed);
+ }
+
+private:
+ /**
+ * @brief Shared state that outlives the ObjectPool if objects are still
+ * in flight. The deleter captures a shared_ptr so it can
+ * safely call recycle even after the ObjectPool destructor runs.
+ */
+ struct SharedState {
+ std::mutex mutex;
+ std::vector> pool;
+ std::atomic alive{true};
+ std::atomic recycleCount{0};
+ };
+
+ std::shared_ptr m_state;
+ std::function()> m_factory;
+
+ std::atomic m_totalAllocated{0};
+ std::atomic m_acquireCount{0};
+};
+
+} // namespace utils
+} // namespace pinnacle
diff --git a/core/utils/ThreadAffinity.cpp b/core/utils/ThreadAffinity.cpp
new file mode 100644
index 0000000..5296642
--- /dev/null
+++ b/core/utils/ThreadAffinity.cpp
@@ -0,0 +1,110 @@
+#include "ThreadAffinity.h"
+
+#include
+
+#ifdef __APPLE__
+#include
+#include
+#include
+#include
+#elif defined(__linux__)
+#include
+#include
+#endif
+
+namespace pinnacle {
+namespace utils {
+
+bool ThreadAffinity::pinToCore(int coreId) {
+ if (coreId < 0 || coreId >= getNumCores()) {
+ spdlog::warn("Invalid core ID {} (available: 0-{})", coreId,
+ getNumCores() - 1);
+ return false;
+ }
+
+#ifdef __APPLE__
+ // macOS uses thread affinity tags (hints, not hard pinning)
+ thread_affinity_policy_data_t policy;
+ policy.affinity_tag = coreId + 1; // 0 means no affinity
+ mach_port_t thread_port = mach_thread_self();
+ kern_return_t ret = thread_policy_set(
+ thread_port, THREAD_AFFINITY_POLICY,
+ reinterpret_cast(&policy), THREAD_AFFINITY_POLICY_COUNT);
+ mach_port_deallocate(mach_task_self(), thread_port);
+ if (ret != KERN_SUCCESS) {
+ spdlog::warn("Failed to set thread affinity to core {}", coreId);
+ return false;
+ }
+ return true;
+
+#elif defined(__linux__)
+ cpu_set_t cpuset;
+ CPU_ZERO(&cpuset);
+ CPU_SET(coreId, &cpuset);
+
+ int ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
+ if (ret != 0) {
+ spdlog::warn("Failed to set thread affinity to core {}: {}", coreId,
+ strerror(ret));
+ return false;
+ }
+ return true;
+
+#else
+ spdlog::warn("Thread affinity not supported on this platform");
+ return false;
+#endif
+}
+
+void ThreadAffinity::setThreadName(const std::string& name) {
+#ifdef __APPLE__
+ // macOS: only the calling thread can set its own name
+ pthread_setname_np(name.substr(0, 63).c_str());
+#elif defined(__linux__)
+ // Linux: name limited to 16 chars including null terminator
+ pthread_setname_np(pthread_self(), name.substr(0, 15).c_str());
+#endif
+}
+
+int ThreadAffinity::getNumCores() {
+ int cores = static_cast(std::thread::hardware_concurrency());
+ return cores > 0 ? cores : 1;
+}
+
+bool ThreadAffinity::pinThreadToCore(std::thread& thread, int coreId) {
+ if (!thread.joinable()) {
+ return false;
+ }
+
+ if (coreId < 0 || coreId >= getNumCores()) {
+ spdlog::warn("Invalid core ID {} (available: 0-{})", coreId,
+ getNumCores() - 1);
+ return false;
+ }
+
+#ifdef __APPLE__
+ // macOS doesn't support pinning another thread from outside easily.
+ // The thread itself should call pinToCore().
+ spdlog::debug("macOS: thread should call pinToCore() itself");
+ return false;
+
+#elif defined(__linux__)
+ cpu_set_t cpuset;
+ CPU_ZERO(&cpuset);
+ CPU_SET(coreId, &cpuset);
+
+ int ret = pthread_setaffinity_np(thread.native_handle(), sizeof(cpu_set_t),
+ &cpuset);
+ if (ret != 0) {
+ spdlog::warn("Failed to pin thread to core {}: {}", coreId, strerror(ret));
+ return false;
+ }
+ return true;
+
+#else
+ return false;
+#endif
+}
+
+} // namespace utils
+} // namespace pinnacle
diff --git a/core/utils/ThreadAffinity.h b/core/utils/ThreadAffinity.h
new file mode 100644
index 0000000..9d8fa0a
--- /dev/null
+++ b/core/utils/ThreadAffinity.h
@@ -0,0 +1,44 @@
+#pragma once
+
+#include
+#include
+
+namespace pinnacle {
+namespace utils {
+
+/**
+ * @class ThreadAffinity
+ * @brief Platform-specific CPU pinning and thread naming utilities
+ */
+class ThreadAffinity {
+public:
+ /**
+ * @brief Pin the calling thread to a specific CPU core
+ * @param coreId The CPU core to pin to (0-indexed)
+ * @return true if pinning succeeded
+ */
+ static bool pinToCore(int coreId);
+
+ /**
+ * @brief Set the name of the calling thread (for debugging/profiling)
+ * @param name Thread name (will be truncated to platform limit)
+ */
+ static void setThreadName(const std::string& name);
+
+ /**
+ * @brief Get the number of available CPU cores
+ * @return Number of hardware threads
+ */
+ static int getNumCores();
+
+ /**
+ * @brief Pin a given std::thread to a specific core
+ * @param thread Thread to pin
+ * @param coreId CPU core to pin to
+ * @return true if pinning succeeded
+ */
+ static bool pinThreadToCore(std::thread& thread, int coreId);
+};
+
+} // namespace utils
+} // namespace pinnacle
diff --git a/docs/CROSS_EXCHANGE_ARBITRAGE.md b/docs/CROSS_EXCHANGE_ARBITRAGE.md
new file mode 100644
index 0000000..0a57228
--- /dev/null
+++ b/docs/CROSS_EXCHANGE_ARBITRAGE.md
@@ -0,0 +1,149 @@
+# Cross-Exchange Arbitrage
+
+## Overview
+
+PinnacleMM includes a cross-exchange arbitrage system that detects and optionally executes price discrepancies across multiple venues. The system consists of two components:
+
+- **ArbitrageDetector**: Continuously scans venue quotes for profitable spreads
+- **ArbitrageExecutor**: Submits simultaneous buy/sell orders to capture the spread
+
+## Quick Start
+
+```bash
+# Enable arbitrage detection in dry-run mode (logs opportunities, no execution)
+cd build && ./pinnaclemm --mode simulation --symbols BTC-USD --enable-arbitrage --arb-dry-run
+
+# Customize minimum spread threshold
+cd build && ./pinnaclemm --mode simulation --enable-arbitrage --arb-min-spread 10.0 --arb-dry-run
+```
+
+## CLI Flags
+
+| Flag | Description | Default |
+|------|-------------|---------|
+| `--enable-arbitrage` | Enable the arbitrage detector | `false` |
+| `--arb-min-spread` | Minimum spread in basis points | `5.0` |
+| `--arb-dry-run` | Log opportunities without executing | `true` |
+
+## Configuration
+
+### `config/default_config.json`
+
+```json
+{
+ "arbitrage": {
+ "enabled": false,
+ "minSpreadBps": 5.0,
+ "minProfitUsd": 1.0,
+ "maxStalenessMs": 500,
+ "scanIntervalMs": 10,
+ "dryRun": true,
+ "venues": ["coinbase", "kraken"],
+ "venueFees": {
+ "coinbase": 0.001,
+ "kraken": 0.0016
+ }
+ }
+}
+```
+
+### Configuration Fields
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `enabled` | bool | Master switch for arbitrage |
+| `minSpreadBps` | double | Minimum net spread (after fees) in bps to consider |
+| `minProfitUsd` | double | Minimum estimated profit in USD |
+| `maxStalenessMs` | uint64 | Maximum quote age before it's considered stale |
+| `scanIntervalMs` | uint64 | How often the detector scans for opportunities |
+| `dryRun` | bool | If true, log opportunities without executing |
+| `venues` | string[] | List of venue identifiers |
+| `venueFees` | map | Per-venue trading fee as a fraction (e.g., 0.001 = 0.1%) |
+
+## Architecture
+
+### Opportunity Detection
+
+The `ArbitrageDetector` maintains a per-venue, per-symbol quote cache. On each scan cycle:
+
+1. For each symbol, enumerate all venue pairs
+2. For each pair, check if `venue_A.bid - venue_B.ask > fees`
+3. Apply staleness filtering (reject quotes older than `maxStalenessMs`)
+4. Apply fee adjustment: `net_spread = (bid - ask) - (ask * fee_buy) - (bid * fee_sell)`
+5. Convert to basis points: `spreadBps = (net_spread / midPrice) * 10000` where `midPrice = (ask + bid) / 2`
+6. Filter by `minSpreadBps` and `minProfitUsd`
+
+### Data Flow
+
+```mermaid
+graph TD
+ A["Venue WebSocket Feeds"] --> B["updateVenueQuote
(coinbase, BTC-USD, bid, bidSize, ask, askSize, ts)"]
+ A --> C["updateVenueQuote
(kraken, BTC-USD, bid, bidSize, ask, askSize, ts)"]
+
+ B --> D["ArbitrageDetector
(background scan thread)"]
+ C --> D
+
+ D --> E["detectOpportunities(BTC-USD)"]
+ E --> F["Compare all venue pairs,
apply fees, filter"]
+
+ F --> G["opportunityCallback
(ArbitrageOpportunity)"]
+
+ G --> H["ArbitrageExecutor"]
+
+ H --> I["Dry-run:
log opportunity"]
+ H --> J["Live:
submit buy + sell
via OrderRouter"]
+
+ style A fill:#4a90d9,stroke:#2c5f8a,color:#fff
+ style D fill:#e8a838,stroke:#b07c1e,color:#fff
+ style H fill:#5cb85c,stroke:#3d8b3d,color:#fff
+ style I fill:#f0ad4e,stroke:#c78c2e,color:#fff
+ style J fill:#d9534f,stroke:#a94442,color:#fff
+```
+
+### ArbitrageOpportunity
+
+```cpp
+struct ArbitrageOpportunity {
+ std::string symbol;
+ std::string buyVenue; // Venue with lowest ask
+ std::string sellVenue; // Venue with highest bid
+ double buyPrice; // Best ask at buy venue
+ double sellPrice; // Best bid at sell venue
+ double spread; // Net spread after fees (sellPrice - buyPrice - fees)
+ double spreadBps; // Net spread in basis points: (spread / midPrice) * 10000
+ double maxQuantity; // Min of buy/sell available size
+ double estimatedProfit; // spread * maxQuantity
+ uint64_t detectedAt; // Nanosecond timestamp
+};
+```
+
+### ArbitrageExecutor
+
+The executor supports two modes:
+
+- **Dry-run** (`dryRun: true`): Simulates execution, returns synthetic fill results
+- **Live** (`dryRun: false`): Uses an `OrderSubmitCallback` to route orders through `OrderRouter`
+
+### Risk Controls
+
+- **Staleness filter**: Quotes older than `maxStalenessMs` are rejected
+- **Fee adjustment**: All opportunities are evaluated net of trading fees
+- **Minimum thresholds**: Both `minSpreadBps` and `minProfitUsd` must be met
+- **Dry-run default**: Production deployments should start in dry-run mode
+
+## Testing
+
+```bash
+cd build
+./arbitrage_detector_tests # 8 tests
+```
+
+Test cases cover:
+- Opportunity detection with sufficient spread
+- Fee adjustment eliminating thin spreads
+- Staleness filtering of old quotes
+- Minimum spread threshold enforcement
+- Dry-run execution simulation
+- Opportunity callback invocation
+- Statistics reporting
+- Single-venue (no self-arbitrage)
diff --git a/docs/CROSS_MARKET_CORRELATION.md b/docs/CROSS_MARKET_CORRELATION.md
new file mode 100644
index 0000000..7e596ab
--- /dev/null
+++ b/docs/CROSS_MARKET_CORRELATION.md
@@ -0,0 +1,139 @@
+# Cross-Market Correlation Analysis
+
+## Overview
+
+The `CrossMarketCorrelation` engine provides statistical models to detect when one instrument's price movement predicts another's. It implements four analysis methods:
+
+1. **Pearson Correlation**: Full-sample linear correlation of log returns
+2. **Rolling Correlation**: Sliding-window correlation for regime sensitivity
+3. **Lead-Lag Analysis**: Detects if one instrument's returns predict another's with a time offset
+4. **Engle-Granger Cointegration**: Tests whether two price series share a long-run equilibrium
+
+## Integration with MLEnhancedMarketMaker
+
+When cross-market signals are active, the `MLEnhancedMarketMaker` adjusts its spread based on the expected move magnitude and confidence from the leading instrument:
+
+```cpp
+// Wire up in main.cpp or strategy initialization
+CrossMarketCorrelation crossMarket(config);
+crossMarket.addPair("BTC-USD", "ETH-USD");
+
+mlStrategy.setCrossMarketCorrelation(&crossMarket);
+```
+
+The spread adjustment widens when a correlated leader instrument shows a large expected move, protecting against adverse selection.
+
+## Configuration
+
+```cpp
+struct CrossMarketConfig {
+ size_t returnWindowSize{100}; // Window for return calculations
+ size_t rollingWindowSize{30}; // Rolling correlation window
+ int maxLagBars{10}; // Max lead-lag offset to test
+ double minCorrelation{0.5}; // Min |correlation| to consider
+ double signalThreshold{0.3}; // Min signal strength to emit
+ double cointegrationPValue{0.05}; // Significance level
+};
+```
+
+In `MLEnhancedMarketMaker::MLConfig`:
+
+```cpp
+bool enableCrossMarketSignals{false};
+double crossMarketSpreadAdjustmentWeight{0.2};
+```
+
+## Statistical Methods
+
+### Pearson Correlation
+
+Computed on log returns (not raw prices) to improve stationarity (formal tests like ADF may still be needed):
+
+```
+r_i = log(P_i / P_{i-1})
+corr(A, B) = Cov(r_A, r_B) / (Std(r_A) * Std(r_B))
+```
+
+Range: [-1, 1]. Values near +1 indicate co-movement; near -1 indicate inverse movement.
+
+### Rolling Correlation
+
+Same as Pearson but computed over the most recent `rollingWindowSize` returns. This captures regime-dependent correlation shifts.
+
+### Lead-Lag Analysis
+
+For each lag offset `k` in `[-maxLagBars, +maxLagBars]`, compute:
+
+```
+corr(r_A[t], r_B[t+k])
+```
+
+The lag with the highest absolute correlation is reported. Positive `leadLagBarsA` means A leads B.
+
+### Engle-Granger Cointegration
+
+1. Regress prices: `P_A = alpha + beta * P_B + epsilon`
+2. Compute residuals `e_t = P_A_t - alpha - beta * P_B_t`
+3. Run simplified ADF test on residuals: `delta_e_t = gamma * e_{t-1} + noise`
+4. If t-statistic for gamma < -3.37 (5% critical value), the pair is cointegrated
+
+Cointegrated pairs have a mean-reverting spread, making them candidates for pairs trading.
+
+## Signal Generation
+
+Signals are generated when:
+1. A pair has `|leadLagCoefficient| >= minCorrelation`
+2. The leader has a non-zero lag offset (`leadLagBarsA != 0`)
+3. The leader's recent return multiplied by the coefficient exceeds `signalThreshold`
+
+```cpp
+struct CrossMarketSignal {
+ std::string leadSymbol; // The instrument that moves first
+ std::string lagSymbol; // The instrument expected to follow
+ double signalStrength; // [0, 1]
+ double expectedMove; // Expected % move in lag symbol
+ double confidence; // Based on rolling correlation
+ uint64_t timestamp;
+};
+```
+
+## Usage
+
+```cpp
+CrossMarketConfig cfg;
+cfg.returnWindowSize = 100;
+cfg.minCorrelation = 0.5;
+
+CrossMarketCorrelation engine(cfg);
+engine.addPair("BTC-USD", "ETH-USD");
+
+// Feed price observations (from market data feeds)
+engine.addPriceObservation("BTC-USD", 50000.0, 1000.0, timestampNs);
+engine.addPriceObservation("ETH-USD", 3000.0, 5000.0, timestampNs);
+
+// Query correlation
+auto corr = engine.getCorrelation("BTC-USD", "ETH-USD");
+// corr.pearsonCorrelation, corr.leadLagBarsA, corr.isCointegrated
+
+// Get active trading signals
+auto signals = engine.getActiveSignals();
+
+// Get statistics
+std::string stats = engine.getStatistics();
+```
+
+## Testing
+
+```bash
+cd build
+./cross_market_correlation_tests # 7 tests
+```
+
+Test cases:
+- Perfectly correlated series (Pearson ~ 1.0)
+- Inversely correlated series (Pearson < -0.8)
+- Lead-lag detection with 2-bar offset
+- Cointegration detection (linear relationship + noise)
+- Uncorrelated data (low |correlation|)
+- Signal generation from lead-lag patterns
+- Statistics output
diff --git a/docs/MULTI_INSTRUMENT_GUIDE.md b/docs/MULTI_INSTRUMENT_GUIDE.md
new file mode 100644
index 0000000..712a73d
--- /dev/null
+++ b/docs/MULTI_INSTRUMENT_GUIDE.md
@@ -0,0 +1,127 @@
+# Multi-Instrument Trading Guide
+
+## Overview
+
+PinnacleMM supports simultaneous trading across multiple instruments, each with its own order book, strategy instance, and optional exchange simulator. The `InstrumentManager` class orchestrates per-instrument lifecycles while sharing global risk management and order routing infrastructure.
+
+## Quick Start
+
+```bash
+# Trade two instruments in simulation mode
+cd build && ./pinnaclemm --mode simulation --symbols BTC-USD,ETH-USD
+
+# Trade with ML enabled on all instruments
+cd build && ./pinnaclemm --mode simulation --symbols BTC-USD,ETH-USD --enable-ml
+
+# Single-symbol mode still works (backward compatible)
+cd build && ./pinnaclemm --mode simulation --symbol BTC-USD
+```
+
+## CLI Flags
+
+| Flag | Description | Default |
+|------|-------------|---------|
+| `--symbols` | Comma-separated list of instruments | (uses `--symbol`) |
+| `--symbol` | Single instrument (backward compat) | `BTC-USD` |
+
+When `--symbols` is provided, it takes precedence over `--symbol`.
+
+## Configuration
+
+### `config/default_config.json`
+
+The `instruments` array defines per-instrument settings:
+
+```json
+{
+ "instruments": [
+ {
+ "symbol": "BTC-USD",
+ "enabled": true,
+ "useLockFree": true,
+ "enableML": false,
+ "baseSpreadBps": 10.0,
+ "orderQuantity": 0.01,
+ "maxPosition": 10.0
+ },
+ {
+ "symbol": "ETH-USD",
+ "enabled": true,
+ "useLockFree": false,
+ "enableML": true,
+ "baseSpreadBps": 15.0,
+ "orderQuantity": 0.1,
+ "maxPosition": 50.0
+ }
+ ]
+}
+```
+
+### Per-Instrument Fields
+
+| Field | Type | Description |
+|-------|------|-------------|
+| `symbol` | string | Trading pair identifier |
+| `enabled` | bool | Whether this instrument is active |
+| `useLockFree` | bool | Use lock-free order book implementation |
+| `enableML` | bool | Enable ML-enhanced strategy for this instrument |
+| `baseSpreadBps` | double | Base spread in basis points |
+| `orderQuantity` | double | Default order size |
+| `maxPosition` | double | Maximum position for this instrument |
+
+## Architecture
+
+### InstrumentManager
+
+The `InstrumentManager` owns a map of `InstrumentContext` objects, each containing:
+
+```
+InstrumentContext
+ ├── symbol: std::string
+ ├── orderBook: std::shared_ptr
+ ├── strategy: std::shared_ptr
+ └── simulator: std::shared_ptr (null in live mode)
+```
+
+### Lifecycle
+
+1. `addInstrument(config, mode)` — Creates order book, strategy, and simulator for the given symbol
+2. `startAll()` — Starts all registered instruments
+3. Main loop — Each instrument runs independently; stats are aggregated
+4. `stopAll()` — Gracefully stops all instruments
+
+### Per-Symbol Risk Tracking
+
+When multiple instruments are active, the `RiskManager` tracks position, PnL, and volume per-symbol using atomic state. See the risk management section for details.
+
+```cpp
+// Register symbols for per-symbol tracking
+rm.registerSymbol("BTC-USD");
+rm.registerSymbol("ETH-USD");
+
+// Set per-symbol limits (optional — falls back to global)
+PerSymbolLimits btcLimits;
+btcLimits.symbol = "BTC-USD";
+btcLimits.maxPositionSize = 5.0;
+rm.setSymbolLimits(btcLimits);
+
+// Query per-symbol state
+auto* btcState = rm.getSymbolState("BTC-USD");
+double btcPosition = btcState->position.load();
+```
+
+## Performance Considerations
+
+- Each instrument runs its own strategy thread and simulator
+- The `ResourceAllocator` distributes CPU cores across instruments based on available hardware
+- Lock-free order books are recommended for high-throughput instruments
+- Global risk checks remain lock-free regardless of instrument count
+- Object pooling reduces allocation overhead on hot paths
+
+## Testing
+
+```bash
+cd build
+./instrument_manager_tests # 9 tests covering lifecycle management
+./multi_instrument_benchmark # Startup scaling and throughput benchmarks
+```
diff --git a/docs/PERFORMANCE_OPTIMIZATION_GUIDE.md b/docs/PERFORMANCE_OPTIMIZATION_GUIDE.md
new file mode 100644
index 0000000..ff1f4d5
--- /dev/null
+++ b/docs/PERFORMANCE_OPTIMIZATION_GUIDE.md
@@ -0,0 +1,174 @@
+# Performance Optimization Guide
+
+## Overview
+
+Phase 5 introduced several performance optimizations to PinnacleMM:
+
+1. **Lock-Free OrderBook Fix**: Eliminated 56x regression, now 4.5x faster than mutex
+2. **Object Pool**: Thread-safe allocation recycling for hot-path objects
+3. **CPU Affinity & Thread Pinning**: Platform-specific core assignment
+4. **Link-Time Optimization (LTO)**: Whole-program optimization at link time
+5. **Dynamic Resource Allocation**: Automatic CPU core distribution across instruments
+
+## Lock-Free OrderBook Optimization
+
+### Problem
+
+The lock-free order book was 56x slower than the mutex-based implementation at 1000 orders due to:
+
+- **O(n) quantity recalculation**: `updateTotalQuantity()` walked the entire linked list on every add/remove
+- **Redundant CAS loops**: Michael-Scott queue CAS retries inside an already-held exclusive lock
+- **Logical deletion without physical unlinking**: Removed nodes stayed in the list, growing walk times
+- **Duplicate shard lock acquisitions**: Separate `contains()` + `insert()` calls
+
+### Fix
+
+| Issue | Fix | Complexity Change |
+|-------|-----|-------------------|
+| `updateTotalQuantity()` full scan | O(1) atomic add/subtract on add/remove | O(n) -> O(1) per op |
+| CAS loop under exclusive lock | Direct pointer append | Eliminates retries |
+| Logical deletion (null pointer) | Physical node unlinking + delete | Prevents list growth |
+| `contains()` + `insert()` | Single `insert()` check | 2 locks -> 1 lock |
+| Spinlock busy-wait | Platform-specific yield hint | Reduces CPU waste |
+
+### Results
+
+| Metric | Before | After | Improvement |
+|--------|--------|-------|-------------|
+| LockFree 100 orders | N/A | 0.17ms | - |
+| LockFree 1000 orders | 75.2ms | 1.79ms | **42x faster** |
+| vs Mutex 1000 orders | 56x slower | 4.5x faster | Regression eliminated |
+| LockFree 10000 orders | N/A | 20.2ms | - |
+| vs Mutex 10000 orders | - | 3.3x faster | - |
+
+## Object Pool
+
+### `core/utils/ObjectPool.h`
+
+Header-only, thread-safe object pool template for hot-path allocation:
+
+```cpp
+#include "core/utils/ObjectPool.h"
+
+// Create pool with 1000 pre-allocated Order objects
+pinnacle::utils::ObjectPool pool(1000);
+
+// Acquire returns shared_ptr with custom deleter that recycles
+auto order = pool.acquire("id", "BTC-USD", OrderSide::BUY, OrderType::LIMIT, 100.0, 1.0, ts);
+
+// When shared_ptr ref count reaches 0, object is recycled back to pool
+// No heap allocation on acquire if pool has available objects
+```
+
+### Design
+
+- Pre-allocates objects in constructor
+- `acquire()` returns `std::shared_ptr` with a custom deleter
+- Custom deleter resets and recycles the object back to the pool
+- Falls back to `new` allocation if pool is exhausted
+- Thread-safe via `std::mutex` on the free list
+
+### When to Use
+
+- Order allocation in `ExchangeSimulator` and `BasicMarketMaker`
+- Any hot-path object that is frequently created and destroyed
+- Objects with non-trivial construction cost
+
+## CPU Affinity & Thread Pinning
+
+### `core/utils/ThreadAffinity.h`
+
+Platform-specific thread pinning for latency-sensitive threads:
+
+```cpp
+#include "core/utils/ThreadAffinity.h"
+
+// Pin current thread to core 2
+pinnacle::utils::ThreadAffinity::pinToCore(2);
+
+// Set thread name (visible in profilers)
+pinnacle::utils::ThreadAffinity::setThreadName("strategy-btc");
+
+// Query available cores
+int cores = pinnacle::utils::ThreadAffinity::getNumCores();
+
+// Pin a specific thread
+pinnacle::utils::ThreadAffinity::pinThreadToCore(myThread, 3);
+```
+
+### Platform Support
+
+| Platform | Thread Pinning | Thread Naming |
+|----------|---------------|---------------|
+| macOS (ARM/x86) | `thread_affinity_policy` | `pthread_setname_np` |
+| Linux | `pthread_setaffinity_np` | `pthread_setname_np` |
+| Other | No-op (returns false) | No-op |
+
+## Link-Time Optimization (LTO)
+
+### CMake Configuration
+
+LTO is enabled via the `ENABLE_LTO` option:
+
+```bash
+cmake -DENABLE_LTO=ON ..
+make -j$(nproc)
+```
+
+When enabled, the compiler performs whole-program optimization across translation units, enabling:
+- Cross-file inlining
+- Dead code elimination
+- Interprocedural constant propagation
+
+### Impact
+
+LTO typically provides 5-15% throughput improvement for compute-bound workloads. The overhead is increased link time.
+
+## Dynamic Resource Allocation
+
+### `core/instrument/ResourceAllocator.h`
+
+Automatically distributes CPU cores across instruments:
+
+```cpp
+ResourceAllocator allocator;
+auto assignments = allocator.allocate(instrumentCount);
+
+for (const auto& assignment : assignments) {
+ // assignment.instrumentIndex - which instrument
+ // assignment.coreId - which CPU core to pin to
+ // assignment.priority - relative priority (0 = highest)
+}
+```
+
+### Allocation Strategy
+
+1. Core 0 is reserved for OS/kernel work
+2. Remaining cores are distributed round-robin across instruments
+3. Priority is assigned based on order (lower index = higher priority)
+
+## Benchmarks
+
+```bash
+cd build
+
+# Order book comparison (mutex vs lock-free)
+./orderbook_benchmark
+
+# Multi-instrument scaling
+./multi_instrument_benchmark
+
+# Key benchmarks in multi_instrument_benchmark:
+# SingleInstrumentStartup — Baseline startup time
+# MultiInstrumentStartup/1..8 — Scaling with 1-8 instruments
+# ObjectPoolAcquireRelease — Pool vs raw allocation
+# ObjectPoolContended/1..4 — Multi-threaded pool performance
+```
+
+## Profiling Tips
+
+1. **CPU profiling**: Use `perf record` (Linux) or Instruments (macOS) to identify hotspots
+2. **Cache analysis**: `perf stat -e cache-misses` to check cache behavior
+3. **Lock contention**: Monitor spinlock spin counts in `LockFreeOrderMap::ShardGuard`
+4. **Memory allocation**: Use `jemalloc` or `tcmalloc` for production builds
+5. **NUMA awareness**: On multi-socket systems, pin threads to cores near their memory
diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md
index fc75c4f..b12b0ff 100644
--- a/docs/ROADMAP.md
+++ b/docs/ROADMAP.md
@@ -152,15 +152,41 @@ PinnacleMM is an ultra-low latency market making system designed for high-freque
**Goal:** Fine-tune performance and scale to multiple markets.
### Deliverables
-- 🔲 Multi-instrument support
-- 🔲 Cross-exchange arbitrage capabilities
-- 🔲 Advanced statistical models for cross-market correlations
-- 🔲 Dynamic resource allocation
-- 🔲 Performance profiling and additional optimizations
-- 🔲 Comprehensive documentation and case studies
-
-### Expected Completion
-- 4 weeks
+- Multi-instrument support via `InstrumentManager` with `--symbols` CLI flag
+- Cross-exchange arbitrage detection and execution (dry-run and live)
+- Cross-market correlation analysis (Pearson, rolling, lead-lag, Engle-Granger cointegration)
+- Per-symbol risk tracking with atomic state and per-symbol position limits
+- Lock-free order book performance fix (56x regression eliminated, now 4.5x faster than mutex)
+- Object pool for hot-path allocation recycling
+- CPU affinity and thread pinning (macOS + Linux)
+- Link-Time Optimization (LTO) build option
+- Dynamic resource allocation for multi-instrument deployments
+- Multi-instrument benchmarks and scaling tests
+- MLEnhancedMarketMaker integration with cross-market signals
+
+### Key Components Implemented
+- **InstrumentManager**: Central orchestrator for per-symbol {orderbook, strategy, simulator} tuples (9 unit tests)
+- **ArbitrageDetector/Executor**: Background scan thread with venue quote cache, fee-adjusted opportunity detection, dry-run support (8 unit tests)
+- **CrossMarketCorrelation**: Pearson/rolling correlation, lead-lag analysis, simplified Engle-Granger ADF test, signal generation for MLEnhancedMarketMaker (7 unit tests)
+- **Per-Symbol Risk**: `SymbolRiskState` with atomics, `registerSymbol()`, `setSymbolLimits()`, `getSymbolState()` (4 new unit tests, 15 total passing)
+- **LockFreeOrderBook Fix**: O(1) quantity updates, physical node unlinking, eliminated CAS retries, platform-specific yield hints
+- **ObjectPool**: Header-only thread-safe pool template with custom shared_ptr deleter
+- **ThreadAffinity**: `pinToCore()`, `setThreadName()`, `getNumCores()` for macOS and Linux
+- **ResourceAllocator**: CPU core distribution across instruments
+
+### Testing
+- 39 new unit tests across 5 test suites (all passing)
+- Multi-instrument benchmark for startup scaling and throughput
+- Full regression check — no existing test regressions
+
+### Documentation
+- [Multi-Instrument Guide](MULTI_INSTRUMENT_GUIDE.md)
+- [Cross-Exchange Arbitrage](CROSS_EXCHANGE_ARBITRAGE.md)
+- [Cross-Market Correlation](CROSS_MARKET_CORRELATION.md)
+- [Performance Optimization Guide](PERFORMANCE_OPTIMIZATION_GUIDE.md)
+
+### Completion
+- **Completed**: March 2026
## Testing Integration
diff --git a/main.cpp b/main.cpp
index 51bad6d..005be86 100644
--- a/main.cpp
+++ b/main.cpp
@@ -1,3 +1,4 @@
+#include "core/instrument/InstrumentManager.h"
#include "core/orderbook/LockFreeOrderBook.h"
#include "core/orderbook/OrderBook.h"
#include "core/persistence/PersistenceManager.h"
@@ -14,6 +15,8 @@
#include "exchange/connector/ExchangeConnectorFactory.h"
#include "exchange/connector/SecureConfig.h"
#include "exchange/simulator/ExchangeSimulator.h"
+#include "strategies/arbitrage/ArbitrageDetector.h"
+#include "strategies/arbitrage/ArbitrageExecutor.h"
#include "strategies/backtesting/BacktestEngine.h"
#include "strategies/basic/BasicMarketMaker.h"
#include "strategies/basic/MLEnhancedMarketMaker.h"
@@ -32,6 +35,7 @@
#include
#include
#include
+#include
#include
#include
@@ -204,7 +208,15 @@ int main(int argc, char* argv[]) {
"slippage-bps", po::value()->default_value(2.0),
"Slippage in basis points for backtest")(
"backtest-duration", po::value()->default_value(3600),
- "Backtest duration in seconds (default: 3600 = 1 hour)");
+ "Backtest duration in seconds (default: 3600 = 1 hour)")(
+ "symbols", po::value(),
+ "Comma-separated list of symbols (e.g. BTC-USD,ETH-USD)")(
+ "enable-arbitrage", po::bool_switch()->default_value(false),
+ "Enable cross-exchange arbitrage detection")(
+ "arb-min-spread", po::value()->default_value(5.0),
+ "Minimum spread in bps for arbitrage")(
+ "arb-dry-run", po::bool_switch()->default_value(true),
+ "Arbitrage dry-run mode (log only, no execution)");
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
@@ -230,6 +242,23 @@ int main(int argc, char* argv[]) {
bool verbose = vm["verbose"].as();
bool useLockFree = vm["lock-free"].as();
+ // Parse multi-symbol flag
+ std::vector symbols;
+ if (vm.count("symbols")) {
+ std::string symbolsStr = vm["symbols"].as();
+ std::istringstream iss(symbolsStr);
+ std::string tok;
+ while (std::getline(iss, tok, ',')) {
+ if (!tok.empty()) {
+ symbols.push_back(tok);
+ }
+ }
+ }
+ if (symbols.empty()) {
+ symbols.push_back(symbol); // fallback to --symbol
+ }
+ bool multiInstrument = symbols.size() > 1;
+
// Initialize logger
auto console_sink = std::make_shared();
auto file_sink =
@@ -243,7 +272,18 @@ int main(int argc, char* argv[]) {
}
spdlog::set_default_logger(logger);
- spdlog::info("Starting PinnacleMM for {} in {} mode", symbol, mode);
+ if (multiInstrument) {
+ std::string symbolList;
+ for (size_t i = 0; i < symbols.size(); ++i) {
+ if (i > 0)
+ symbolList += ", ";
+ symbolList += symbols[i];
+ }
+ spdlog::info("Starting PinnacleMM with {} instruments [{}] in {} mode",
+ symbols.size(), symbolList, mode);
+ } else {
+ spdlog::info("Starting PinnacleMM for {} in {} mode", symbol, mode);
+ }
spdlog::info("Using lock-free data structures: {}",
useLockFree ? "enabled" : "disabled");
@@ -355,6 +395,135 @@ int main(int argc, char* argv[]) {
"CircuitBreaker");
});
+ // Register all symbols with the risk manager for per-symbol tracking
+ for (const auto& sym : symbols) {
+ riskManager.registerSymbol(sym);
+ }
+
+ // Apply per-symbol limits from config
+ for (const auto& psl : riskConfig.perSymbolLimits) {
+ riskManager.setSymbolLimits(psl);
+ }
+
+ // InstrumentManager for multi-instrument mode
+ pinnacle::instrument::InstrumentManager instrumentManager;
+ bool enableML = vm["enable-ml"].as();
+
+ // Initialize JSON logger if enabled
+ std::shared_ptr jsonLogger;
+ if (vm["json-log"].as()) {
+ std::string jsonLogFile = vm["json-log-file"].as();
+ jsonLogger =
+ std::make_shared(jsonLogFile, true);
+ spdlog::info("JSON logging enabled, output file: {}", jsonLogFile);
+ }
+
+ if (multiInstrument) {
+ // Multi-instrument path: use InstrumentManager
+ for (const auto& sym : symbols) {
+ pinnacle::instrument::InstrumentConfig instCfg;
+ instCfg.symbol = sym;
+ instCfg.useLockFree = useLockFree;
+ instCfg.enableML = enableML;
+ instrumentManager.addInstrument(instCfg, mode);
+ }
+
+ // For backtest mode with multiple instruments, not yet supported
+ if (mode == "backtest") {
+ spdlog::warn("Backtest mode with multiple instruments not yet "
+ "supported. Using first symbol only.");
+ // Fall through to single-instrument backtest below
+ } else if (mode != "live") {
+ // Simulation mode: start all instruments
+ if (!instrumentManager.startAll()) {
+ spdlog::error("Failed to start all instruments");
+ return 1;
+ }
+
+ spdlog::info("All {} instruments started in simulation mode",
+ symbols.size());
+
+ // Setup arbitrage if enabled
+ std::unique_ptr arbDetector;
+ std::unique_ptr arbExecutor;
+ if (vm["enable-arbitrage"].as()) {
+ pinnacle::arbitrage::ArbitrageConfig arbConfig;
+ arbConfig.minSpreadBps = vm["arb-min-spread"].as();
+ arbConfig.dryRun = vm["arb-dry-run"].as();
+ arbConfig.symbols = symbols;
+ arbConfig.scanIntervalMs = 100;
+
+ arbDetector =
+ std::make_unique(
+ arbConfig);
+ arbExecutor =
+ std::make_unique(
+ arbConfig.dryRun);
+
+ arbDetector->setOpportunityCallback(
+ [&arbExecutor](
+ const pinnacle::arbitrage::ArbitrageOpportunity& opp) {
+ arbExecutor->execute(opp);
+ });
+
+ arbDetector->start();
+ spdlog::info("Arbitrage detector started (dryRun={})",
+ arbConfig.dryRun);
+ }
+
+ // Main loop for multi-instrument simulation
+ uint64_t lastStatsTime = 0;
+ uint64_t lastCheckpointTime = 0;
+
+ while (g_running.load()) {
+ uint64_t currentTime = pinnacle::utils::TimeUtils::getCurrentMillis();
+
+ if (currentTime - lastStatsTime > 5000) {
+ spdlog::info("======================");
+ spdlog::info("Current time: {}",
+ pinnacle::utils::TimeUtils::getCurrentISOTimestamp());
+ spdlog::info("{}", instrumentManager.getAggregateStatistics());
+
+ if (arbDetector) {
+ spdlog::info("{}", arbDetector->getStatistics());
+ }
+
+ spdlog::info("======================");
+ lastStatsTime = currentTime;
+ }
+
+ if (currentTime - lastCheckpointTime > 5 * 60 * 1000) {
+ instrumentManager.createCheckpoints();
+ lastCheckpointTime = currentTime;
+ }
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+
+ // Shutdown
+ spdlog::info("Shutting down multi-instrument mode...");
+
+ if (arbDetector) {
+ arbDetector->stop();
+ }
+
+ instrumentManager.stopAll();
+
+ if (varEngine) {
+ varEngine->stop();
+ }
+
+ spdlog::info("Final statistics:");
+ spdlog::info("{}", instrumentManager.getAggregateStatistics());
+
+ AUDIT_SYSTEM_EVENT("PinnacleMM system shutdown complete", true);
+ spdlog::info("Shutdown complete");
+ return 0;
+ }
+ }
+
+ // --- Single-instrument path (backward compatible) ---
+
// Create or retrieve order book
std::shared_ptr orderBook;
@@ -382,18 +551,8 @@ int main(int argc, char* argv[]) {
config.symbol = symbol;
// Initialize strategy (basic or ML-enhanced)
- bool enableML = vm["enable-ml"].as();
std::shared_ptr strategy;
- // Initialize JSON logger if enabled
- std::shared_ptr jsonLogger;
- if (vm["json-log"].as()) {
- std::string jsonLogFile = vm["json-log-file"].as();
- jsonLogger =
- std::make_shared(jsonLogFile, true);
- spdlog::info("JSON logging enabled, output file: {}", jsonLogFile);
- }
-
if (enableML) {
spdlog::info("Initializing ML-enhanced market maker");
diff --git a/strategies/analytics/CrossMarketCorrelation.cpp b/strategies/analytics/CrossMarketCorrelation.cpp
new file mode 100644
index 0000000..c72971a
--- /dev/null
+++ b/strategies/analytics/CrossMarketCorrelation.cpp
@@ -0,0 +1,425 @@
+#include "CrossMarketCorrelation.h"
+
+#include
+#include
+#include
+#include
+
+namespace pinnacle {
+namespace analytics {
+
+CrossMarketCorrelation::CrossMarketCorrelation(const CrossMarketConfig& config)
+ : m_config(config) {}
+
+void CrossMarketCorrelation::addPriceObservation(const std::string& symbol,
+ double price, double volume,
+ uint64_t timestamp) {
+ std::lock_guard lock(m_dataMutex);
+
+ auto& series = m_series[symbol];
+ series.prices.push_back(price);
+ series.volumes.push_back(volume);
+ series.timestamps.push_back(timestamp);
+
+ // Compute log return (guard against non-positive prices)
+ if (series.prices.size() >= 2) {
+ size_t n = series.prices.size();
+ if (series.prices[n - 2] > 0.0 && series.prices[n - 1] > 0.0) {
+ double ret = std::log(series.prices[n - 1] / series.prices[n - 2]);
+ series.returns.push_back(ret);
+ }
+ }
+
+ // Trim to window size (keep 2x for lead-lag analysis)
+ size_t maxSize = m_config.returnWindowSize * 2;
+ while (series.prices.size() > maxSize) {
+ series.prices.pop_front();
+ series.volumes.pop_front();
+ series.timestamps.pop_front();
+ }
+ while (series.returns.size() > maxSize) {
+ series.returns.pop_front();
+ }
+
+ // Update all pairs involving this symbol
+ m_signalsDirty = true;
+ for (auto& [key, pair] : m_pairs) {
+ if (key.symbolA == symbol || key.symbolB == symbol) {
+ updatePair(key);
+ }
+ }
+}
+
+void CrossMarketCorrelation::addPair(const std::string& symbolA,
+ const std::string& symbolB) {
+ std::lock_guard lock(m_dataMutex);
+
+ PairKey key{symbolA, symbolB};
+ if (m_pairs.count(key)) {
+ return;
+ }
+
+ CorrelationPair pair;
+ pair.symbolA = symbolA;
+ pair.symbolB = symbolB;
+ m_pairs[key] = pair;
+ m_signalsDirty = true;
+
+ // Initialize metrics immediately if both symbols already have data
+ if (m_series.count(symbolA) && m_series.count(symbolB)) {
+ updatePair(key);
+ }
+}
+
+void CrossMarketCorrelation::removePair(const std::string& symbolA,
+ const std::string& symbolB) {
+ std::lock_guard lock(m_dataMutex);
+ m_pairs.erase(PairKey{symbolA, symbolB});
+ m_signalsDirty = true;
+}
+
+CorrelationPair
+CrossMarketCorrelation::getCorrelation(const std::string& symbolA,
+ const std::string& symbolB) const {
+ std::lock_guard lock(m_dataMutex);
+ auto it = m_pairs.find(PairKey{symbolA, symbolB});
+ if (it != m_pairs.end()) {
+ return it->second;
+ }
+ return CorrelationPair{symbolA, symbolB};
+}
+
+std::vector
+CrossMarketCorrelation::getActiveSignals() const {
+ std::lock_guard lock(m_dataMutex);
+ if (m_signalsDirty) {
+ updateSignals();
+ }
+ return m_signals;
+}
+
+std::vector
+CrossMarketCorrelation::getAllCorrelations() const {
+ std::lock_guard lock(m_dataMutex);
+ std::vector result;
+ result.reserve(m_pairs.size());
+ for (const auto& [key, pair] : m_pairs) {
+ result.push_back(pair);
+ }
+ return result;
+}
+
+std::string CrossMarketCorrelation::getStatistics() const {
+ std::lock_guard lock(m_dataMutex);
+
+ std::ostringstream oss;
+ oss << "CrossMarketCorrelation Statistics:\n";
+ oss << " Tracked symbols: " << m_series.size() << "\n";
+ oss << " Registered pairs: " << m_pairs.size() << "\n";
+
+ for (const auto& [key, pair] : m_pairs) {
+ oss << " " << pair.symbolA << "/" << pair.symbolB << ":\n";
+ oss << " Pearson: " << pair.pearsonCorrelation << "\n";
+ oss << " Rolling: " << pair.rollingCorrelation << "\n";
+ oss << " Lead-lag: " << pair.leadLagBarsA << " bars"
+ << " (coeff=" << pair.leadLagCoefficient << ")\n";
+ oss << " Cointegration: " << pair.cointegrationScore
+ << (pair.isCointegrated ? " [cointegrated]" : "") << "\n";
+ }
+
+ return oss.str();
+}
+
+// --- Statistical computation methods ---
+
+double CrossMarketCorrelation::computePearsonCorrelation(
+ const std::deque& x, const std::deque& y) const {
+ size_t n = std::min(x.size(), y.size());
+ if (n < 3) {
+ return 0.0;
+ }
+
+ // Use last n elements
+ double sumX = 0, sumY = 0, sumXY = 0, sumX2 = 0, sumY2 = 0;
+
+ for (size_t i = 0; i < n; ++i) {
+ size_t xi = x.size() - n + i;
+ size_t yi = y.size() - n + i;
+ sumX += x[xi];
+ sumY += y[yi];
+ sumXY += x[xi] * y[yi];
+ sumX2 += x[xi] * x[xi];
+ sumY2 += y[yi] * y[yi];
+ }
+
+ double denomSq = (n * sumX2 - sumX * sumX) * (n * sumY2 - sumY * sumY);
+ if (denomSq <= 0.0) {
+ return 0.0;
+ }
+ double denom = std::sqrt(denomSq);
+ if (denom < 1e-15) {
+ return 0.0;
+ }
+
+ return (n * sumXY - sumX * sumY) / denom;
+}
+
+double CrossMarketCorrelation::computeRollingCorrelation(
+ const std::deque& x, const std::deque& y) const {
+ size_t window = m_config.rollingWindowSize;
+ size_t n = std::min({x.size(), y.size(), window});
+ if (n < 3) {
+ return 0.0;
+ }
+
+ // Use the last 'n' elements
+ std::deque xWindow(x.end() - n, x.end());
+ std::deque yWindow(y.end() - n, y.end());
+
+ return computePearsonCorrelation(xWindow, yWindow);
+}
+
+CrossMarketCorrelation::LeadLagResult
+CrossMarketCorrelation::computeLeadLag(const std::deque& x,
+ const std::deque& y) const {
+ LeadLagResult result;
+ double bestCorr = 0.0;
+ int maxLag = m_config.maxLagBars;
+
+ size_t n = std::min(x.size(), y.size());
+ if (n < static_cast(maxLag + 3)) {
+ return result;
+ }
+
+ for (int lag = -maxLag; lag <= maxLag; ++lag) {
+ // Compute correlation with offset
+ size_t effectiveN = n - std::abs(lag);
+ if (effectiveN < 3) {
+ continue;
+ }
+
+ double sumX = 0, sumY = 0, sumXY = 0, sumX2 = 0, sumY2 = 0;
+
+ for (size_t i = 0; i < effectiveN; ++i) {
+ // Use n (not effectiveN) as the base window to avoid double-offsetting.
+ // For lag >= 0: correlate x[t] with y[t+lag], t = 0..effectiveN-1
+ // For lag < 0: correlate x[t+|lag|] with y[t], t = 0..effectiveN-1
+ size_t xi = (lag >= 0) ? (x.size() - n + i) : (x.size() - n + i - lag);
+ size_t yi = (lag >= 0) ? (y.size() - n + i + lag) : (y.size() - n + i);
+
+ if (xi >= x.size() || yi >= y.size()) {
+ continue;
+ }
+
+ sumX += x[xi];
+ sumY += y[yi];
+ sumXY += x[xi] * y[yi];
+ sumX2 += x[xi] * x[xi];
+ sumY2 += y[yi] * y[yi];
+ }
+
+ double denomSq =
+ (effectiveN * sumX2 - sumX * sumX) * (effectiveN * sumY2 - sumY * sumY);
+ if (denomSq <= 0.0) {
+ continue;
+ }
+ double denom = std::sqrt(denomSq);
+ if (denom < 1e-15) {
+ continue;
+ }
+
+ double corr = (effectiveN * sumXY - sumX * sumY) / denom;
+
+ if (std::abs(corr) > std::abs(bestCorr)) {
+ bestCorr = corr;
+ result.bestLag = lag;
+ result.coefficient = corr;
+ }
+ }
+
+ return result;
+}
+
+double CrossMarketCorrelation::computeCointegration(
+ const std::deque& pricesA,
+ const std::deque& pricesB) const {
+ // Simplified Engle-Granger: OLS regression of A on B, then ADF on residuals
+ size_t n = std::min(pricesA.size(), pricesB.size());
+ if (n < 20) {
+ return 0.0;
+ }
+
+ // Step 1: OLS regression Y = alpha + beta * X
+ double sumX = 0, sumY = 0, sumXY = 0, sumX2 = 0;
+
+ for (size_t i = 0; i < n; ++i) {
+ size_t ai = pricesA.size() - n + i;
+ size_t bi = pricesB.size() - n + i;
+ sumX += pricesB[bi];
+ sumY += pricesA[ai];
+ sumXY += pricesA[ai] * pricesB[bi];
+ sumX2 += pricesB[bi] * pricesB[bi];
+ }
+
+ double denom = n * sumX2 - sumX * sumX;
+ if (std::abs(denom) < 1e-15) {
+ return 0.0;
+ }
+
+ double beta = (n * sumXY - sumX * sumY) / denom;
+ double alpha = (sumY - beta * sumX) / n;
+
+ // Step 2: Compute residuals
+ std::vector residuals(n);
+ for (size_t i = 0; i < n; ++i) {
+ size_t ai = pricesA.size() - n + i;
+ size_t bi = pricesB.size() - n + i;
+ residuals[i] = pricesA[ai] - alpha - beta * pricesB[bi];
+ }
+
+ // Step 3: Simplified ADF test on residuals
+ // delta_e(t) = gamma * e(t-1) + noise
+ // t-stat for gamma
+ double sumE1 = 0, sumDE = 0, sumE1DE = 0, sumE12 = 0;
+ for (size_t i = 1; i < n; ++i) {
+ double e1 = residuals[i - 1];
+ double de = residuals[i] - residuals[i - 1];
+ sumE1 += e1;
+ sumDE += de;
+ sumE12 += e1 * e1;
+ sumE1DE += e1 * de;
+ }
+
+ size_t m = n - 1;
+ double gammaDenom = m * sumE12 - sumE1 * sumE1;
+ if (std::abs(gammaDenom) < 1e-15) {
+ return 0.0;
+ }
+
+ double gamma = (m * sumE1DE - sumE1 * sumDE) / gammaDenom;
+
+ // Compute standard error of gamma
+ double gammaAlpha = (sumDE - gamma * sumE1) / m;
+ double sse = 0.0;
+ for (size_t i = 1; i < n; ++i) {
+ double predicted = gammaAlpha + gamma * residuals[i - 1];
+ double actual = residuals[i] - residuals[i - 1];
+ double err = actual - predicted;
+ sse += err * err;
+ }
+ double se = std::sqrt(sse / (m - 2));
+ double varianceTerm = sumE12 - sumE1 * sumE1 / m;
+ if (varianceTerm < 1e-15) {
+ return 0.0;
+ }
+ double seGamma = se / std::sqrt(varianceTerm);
+
+ if (seGamma < 1e-15) {
+ return 0.0;
+ }
+
+ // t-statistic (more negative = stronger cointegration)
+ return gamma / seGamma;
+}
+
+void CrossMarketCorrelation::updatePair(const PairKey& key) {
+ // Must be called with m_dataMutex held
+
+ auto itA = m_series.find(key.symbolA);
+ auto itB = m_series.find(key.symbolB);
+
+ if (itA == m_series.end() || itB == m_series.end()) {
+ return;
+ }
+
+ auto& seriesA = itA->second;
+ auto& seriesB = itB->second;
+
+ auto pairIt = m_pairs.find(key);
+ if (pairIt == m_pairs.end()) {
+ return;
+ }
+
+ auto& pair = pairIt->second;
+
+ // Pearson correlation on returns
+ if (seriesA.returns.size() >= 10 && seriesB.returns.size() >= 10) {
+ pair.pearsonCorrelation =
+ computePearsonCorrelation(seriesA.returns, seriesB.returns);
+ pair.rollingCorrelation =
+ computeRollingCorrelation(seriesA.returns, seriesB.returns);
+
+ auto leadLag = computeLeadLag(seriesA.returns, seriesB.returns);
+ pair.leadLagCoefficient = leadLag.coefficient;
+ pair.leadLagBarsA = leadLag.bestLag;
+ }
+
+ // Cointegration on prices
+ if (seriesA.prices.size() >= 20 && seriesB.prices.size() >= 20) {
+ pair.cointegrationScore =
+ computeCointegration(seriesA.prices, seriesB.prices);
+
+ // Approximate critical values for Engle-Granger (2 variables)
+ // At 5% significance: ~ -3.37
+ pair.isCointegrated = pair.cointegrationScore < -3.37;
+ }
+}
+
+void CrossMarketCorrelation::updateSignals() const {
+ // Must be called with m_dataMutex held
+
+ m_signals.clear();
+
+ for (const auto& [key, pair] : m_pairs) {
+ // Only generate signals for pairs with sufficient lead-lag relationship
+ if (std::abs(pair.leadLagCoefficient) < m_config.minCorrelation) {
+ continue;
+ }
+
+ if (pair.leadLagBarsA == 0) {
+ continue; // No lead-lag detected
+ }
+
+ // Determine which symbol leads
+ std::string leadSym, lagSym;
+ if (pair.leadLagBarsA > 0) {
+ leadSym = pair.symbolA;
+ lagSym = pair.symbolB;
+ } else {
+ leadSym = pair.symbolB;
+ lagSym = pair.symbolA;
+ }
+
+ // Get the leader's recent return
+ auto leadIt = m_series.find(leadSym);
+ if (leadIt == m_series.end() || leadIt->second.returns.empty()) {
+ continue;
+ }
+
+ double leaderReturn = leadIt->second.returns.back();
+ double signalStrength =
+ std::abs(pair.leadLagCoefficient) * std::abs(leaderReturn);
+
+ if (signalStrength < m_config.signalThreshold) {
+ continue;
+ }
+
+ CrossMarketSignal signal;
+ signal.leadSymbol = leadSym;
+ signal.lagSymbol = lagSym;
+ signal.signalStrength = std::min(1.0, signalStrength);
+ signal.expectedMove = leaderReturn * pair.leadLagCoefficient;
+ signal.confidence = std::abs(pair.rollingCorrelation);
+
+ if (!leadIt->second.timestamps.empty()) {
+ signal.timestamp = leadIt->second.timestamps.back();
+ }
+
+ m_signals.push_back(signal);
+ }
+
+ m_signalsDirty = false;
+}
+
+} // namespace analytics
+} // namespace pinnacle
diff --git a/strategies/analytics/CrossMarketCorrelation.h b/strategies/analytics/CrossMarketCorrelation.h
new file mode 100644
index 0000000..6817210
--- /dev/null
+++ b/strategies/analytics/CrossMarketCorrelation.h
@@ -0,0 +1,172 @@
+#pragma once
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace pinnacle {
+namespace analytics {
+
+/**
+ * @struct CorrelationPair
+ * @brief Statistics for a pair of correlated instruments
+ */
+struct CorrelationPair {
+ std::string symbolA;
+ std::string symbolB;
+ double pearsonCorrelation{0.0}; // [-1, 1]
+ double rollingCorrelation{0.0}; // recent-window correlation
+ double leadLagCoefficient{0.0}; // strength of lead-lag relationship
+ int leadLagBarsA{0}; // positive = A leads B
+ double cointegrationScore{
+ 0.0}; // Engle-Granger t-stat (more negative = stronger)
+ bool isCointegrated{false};
+};
+
+/**
+ * @struct CrossMarketSignal
+ * @brief Trading signal from cross-market analysis
+ */
+struct CrossMarketSignal {
+ std::string leadSymbol;
+ std::string lagSymbol;
+ double signalStrength{0.0}; // [0, 1]
+ double expectedMove{0.0}; // expected % move in lag symbol
+ double confidence{0.0}; // [0, 1]
+ uint64_t timestamp{0};
+};
+
+/**
+ * @struct CrossMarketConfig
+ * @brief Configuration for cross-market correlation analysis
+ */
+struct CrossMarketConfig {
+ size_t returnWindowSize{100}; // window for return calculations
+ size_t rollingWindowSize{30}; // rolling correlation window
+ int maxLagBars{10}; // max lead-lag offset to test
+ double minCorrelation{0.5}; // min absolute correlation to consider
+ double signalThreshold{0.3}; // min signal strength to emit
+ double cointegrationPValue{0.05}; // significance level for cointegration
+};
+
+/**
+ * @class CrossMarketCorrelation
+ * @brief Statistical models to detect when one instrument's movement predicts
+ * another's
+ *
+ * Implements Pearson correlation, rolling correlation, lead-lag analysis,
+ * and Engle-Granger cointegration test for pairs of instruments.
+ */
+class CrossMarketCorrelation {
+public:
+ explicit CrossMarketCorrelation(const CrossMarketConfig& config = {});
+ ~CrossMarketCorrelation() = default;
+
+ CrossMarketCorrelation(const CrossMarketCorrelation&) = delete;
+ CrossMarketCorrelation& operator=(const CrossMarketCorrelation&) = delete;
+
+ /**
+ * @brief Add a price observation for a symbol
+ * @param symbol Trading symbol
+ * @param price Current price
+ * @param volume Current volume
+ * @param timestamp Nanosecond timestamp
+ */
+ void addPriceObservation(const std::string& symbol, double price,
+ double volume, uint64_t timestamp);
+
+ /**
+ * @brief Register a pair for correlation tracking
+ * @param symbolA First symbol
+ * @param symbolB Second symbol
+ */
+ void addPair(const std::string& symbolA, const std::string& symbolB);
+
+ /**
+ * @brief Remove a pair
+ */
+ void removePair(const std::string& symbolA, const std::string& symbolB);
+
+ /**
+ * @brief Get correlation statistics for a pair
+ */
+ CorrelationPair getCorrelation(const std::string& symbolA,
+ const std::string& symbolB) const;
+
+ /**
+ * @brief Get all currently active cross-market signals
+ */
+ std::vector getActiveSignals() const;
+
+ /**
+ * @brief Get all registered pairs with their correlations
+ */
+ std::vector getAllCorrelations() const;
+
+ /**
+ * @brief Get statistics string
+ */
+ std::string getStatistics() const;
+
+private:
+ CrossMarketConfig m_config;
+
+ // Per-symbol price series
+ struct PriceSeries {
+ std::deque prices;
+ std::deque returns; // log returns
+ std::deque volumes;
+ std::deque timestamps;
+ };
+
+ mutable std::mutex m_dataMutex;
+ std::unordered_map m_series;
+
+ // Registered pairs
+ struct PairKey {
+ std::string symbolA;
+ std::string symbolB;
+ bool operator==(const PairKey& other) const {
+ return symbolA == other.symbolA && symbolB == other.symbolB;
+ }
+ };
+
+ struct PairKeyHash {
+ size_t operator()(const PairKey& k) const {
+ return std::hash{}(k.symbolA) ^
+ (std::hash{}(k.symbolB) << 1);
+ }
+ };
+
+ std::unordered_map m_pairs;
+
+ // Cached signals
+ mutable std::vector m_signals;
+ mutable bool m_signalsDirty{true};
+
+ // Statistical computation methods
+ double computePearsonCorrelation(const std::deque& x,
+ const std::deque& y) const;
+
+ double computeRollingCorrelation(const std::deque& x,
+ const std::deque& y) const;
+
+ struct LeadLagResult {
+ double coefficient{0.0};
+ int bestLag{0};
+ };
+ LeadLagResult computeLeadLag(const std::deque& x,
+ const std::deque& y) const;
+
+ double computeCointegration(const std::deque& pricesA,
+ const std::deque& pricesB) const;
+
+ void updatePair(const PairKey& key);
+ void updateSignals() const;
+};
+
+} // namespace analytics
+} // namespace pinnacle
diff --git a/strategies/arbitrage/ArbitrageDetector.cpp b/strategies/arbitrage/ArbitrageDetector.cpp
new file mode 100644
index 0000000..478ed10
--- /dev/null
+++ b/strategies/arbitrage/ArbitrageDetector.cpp
@@ -0,0 +1,272 @@
+#include "ArbitrageDetector.h"
+
+#include
+#include
+#include
+#include
+
+namespace pinnacle {
+namespace arbitrage {
+
+ArbitrageDetector::ArbitrageDetector(const ArbitrageConfig& config)
+ : m_config(config) {}
+
+ArbitrageDetector::~ArbitrageDetector() { stop(); }
+
+bool ArbitrageDetector::start() {
+ bool expected = false;
+ if (!m_running.compare_exchange_strong(expected, true,
+ std::memory_order_acq_rel,
+ std::memory_order_acquire)) {
+ return true; // Already running
+ }
+
+ try {
+ m_scanThread = std::thread(&ArbitrageDetector::scanLoop, this);
+ } catch (const std::exception& e) {
+ m_running.store(false, std::memory_order_release);
+ spdlog::error("ArbitrageDetector failed to start scan thread: {}",
+ e.what());
+ return false;
+ }
+
+ spdlog::info("ArbitrageDetector started (scanInterval={}ms dryRun={})",
+ m_config.scanIntervalMs, m_config.dryRun);
+ return true;
+}
+
+bool ArbitrageDetector::stop() {
+ bool expected = true;
+ if (!m_running.compare_exchange_strong(expected, false,
+ std::memory_order_acq_rel,
+ std::memory_order_acquire)) {
+ return true; // Already stopped
+ }
+
+ if (m_scanThread.joinable()) {
+ m_scanThread.join();
+ }
+ spdlog::info("ArbitrageDetector stopped (total opportunities={})",
+ m_totalOpportunities.load(std::memory_order_relaxed));
+ return true;
+}
+
+bool ArbitrageDetector::isRunning() const {
+ return m_running.load(std::memory_order_acquire);
+}
+
+void ArbitrageDetector::updateVenueQuote(const std::string& venue,
+ const std::string& symbol, double bid,
+ double bidSize, double ask,
+ double askSize, uint64_t timestamp) {
+ VenueQuote quote;
+ quote.bidPrice = bid;
+ quote.bidSize = bidSize;
+ quote.askPrice = ask;
+ quote.askSize = askSize;
+ quote.timestamp = timestamp;
+
+ std::lock_guard lock(m_quotesMutex);
+ m_quotes[venue][symbol] = quote;
+}
+
+std::vector
+ArbitrageDetector::getCurrentOpportunities() const {
+ std::lock_guard lock(m_opportunitiesMutex);
+ return m_opportunities;
+}
+
+void ArbitrageDetector::setOpportunityCallback(OpportunityCallback callback) {
+ std::lock_guard lock(m_callbackMutex);
+ m_callback = std::move(callback);
+}
+
+std::string ArbitrageDetector::getStatistics() const {
+ std::ostringstream oss;
+ oss << "ArbitrageDetector Statistics:\n";
+ oss << " Total scans: " << m_totalScans.load(std::memory_order_relaxed)
+ << "\n";
+ oss << " Total opportunities: "
+ << m_totalOpportunities.load(std::memory_order_relaxed) << "\n";
+ oss << " Dry run: " << (m_config.dryRun ? "yes" : "no") << "\n";
+ oss << " Min spread: " << m_config.minSpreadBps << " bps\n";
+
+ auto currentOpps = getCurrentOpportunities();
+ oss << " Active opportunities: " << currentOpps.size() << "\n";
+ for (const auto& opp : currentOpps) {
+ oss << " " << opp.symbol << ": buy@" << opp.buyVenue << " "
+ << opp.buyPrice << " sell@" << opp.sellVenue << " " << opp.sellPrice
+ << " spread=" << opp.spreadBps << "bps profit=$" << opp.estimatedProfit
+ << "\n";
+ }
+
+ return oss.str();
+}
+
+uint64_t ArbitrageDetector::getTotalOpportunitiesDetected() const {
+ return m_totalOpportunities.load(std::memory_order_relaxed);
+}
+
+void ArbitrageDetector::scanLoop() {
+ while (m_running.load(std::memory_order_acquire)) {
+ std::vector allOpps;
+
+ for (const auto& symbol : m_config.symbols) {
+ auto opps = detectOpportunities(symbol);
+ allOpps.insert(allOpps.end(), opps.begin(), opps.end());
+ }
+
+ m_totalScans.fetch_add(1, std::memory_order_relaxed);
+
+ // Save a copy for callbacks before moving into shared state
+ size_t newOppsCount = allOpps.size();
+ std::vector callbackOpps;
+ if (newOppsCount > 0) {
+ callbackOpps = allOpps;
+ }
+
+ // Update current opportunities
+ {
+ std::lock_guard lock(m_opportunitiesMutex);
+ m_opportunities = std::move(allOpps);
+ }
+
+ // Fire callbacks outside any lock to prevent deadlock
+ // (callback may call getCurrentOpportunities() which locks
+ // m_opportunitiesMutex)
+ if (newOppsCount > 0) {
+ m_totalOpportunities.fetch_add(newOppsCount, std::memory_order_relaxed);
+
+ OpportunityCallback cb;
+ {
+ std::lock_guard