diff --git a/src/main/java/com/cleanengine/coin/chart/handler/TradeEventHandler.java b/src/main/java/com/cleanengine/coin/chart/handler/TradeEventHandler.java index 7b5d9270..db294c80 100644 --- a/src/main/java/com/cleanengine/coin/chart/handler/TradeEventHandler.java +++ b/src/main/java/com/cleanengine/coin/chart/handler/TradeEventHandler.java @@ -12,8 +12,8 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; -import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; +import org.springframework.transaction.event.TransactionalEventListener; @Component @RequiredArgsConstructor @@ -25,7 +25,7 @@ public class TradeEventHandler { private final RealTimeDataPrevRateService realTimeDataPrevRateService; //event로 이벤틀 처리해야한다. //eventListener는 void로 처리를 해야한다 - @EventListener + @TransactionalEventListener public void handleTradeEvent(TradeExecutedEvent event) { Trade trade = event.getTrade(); if (trade == null) { diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java index 1e96fbfa..a7256f70 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java @@ -12,11 +12,12 @@ import com.cleanengine.coin.user.domain.Wallet; import com.cleanengine.coin.user.info.application.AccountService; import com.cleanengine.coin.user.info.application.WalletService; -import jakarta.transaction.Transactional; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; @@ -33,7 +34,7 @@ public class TradeExecutor { private final TradeExecutedEventPublisher tradeExecutedEventPublisher; private final TradeService tradeService; - @Transactional + @Transactional(propagation = Propagation.REQUIRES_NEW) public void executeTrade(WaitingOrders waitingOrders, TradePair tradePair, String ticker) { BuyOrder buyOrder = tradePair.getBuyOrder(); SellOrder sellOrder = tradePair.getSellOrder(); @@ -48,7 +49,7 @@ public void executeTrade(WaitingOrders waitingOrders, TradePair tr tradedPrice = tradeUnitPriceAndSize.tradedPrice(); if (approxEquals(tradedSize, 0.0)) { log.debug("체결 중단! 체결 시도 수량 : {}, 매수단가 : {}, 매도단가 : {}", tradedSize, buyOrder.getPrice(), sellOrder.getPrice()); - return; + return ; } this.writeTradingLog(buyOrder, sellOrder); diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java b/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java index 5a7cab22..df367924 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java @@ -2,11 +2,10 @@ import com.cleanengine.coin.order.domain.Order; import com.cleanengine.coin.order.domain.spi.WaitingOrders; +import com.cleanengine.coin.order.domain.spi.WaitingOrdersManager; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; import java.util.Optional; @@ -17,14 +16,25 @@ public class TradeFlowService { private final TradeMatcher tradeMatcher; private final TradeExecutor tradeExecutor; + private final WaitingOrdersManager waitingOrdersManager; - @Transactional(propagation = Propagation.REQUIRES_NEW) public void execMatchAndTrade(String ticker) { - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); + WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker); // TODO : peek() 해온 Order 객체들을 lock -> 체결 도중 취소 방지 Optional> tradePair = tradeMatcher.matchOrders(waitingOrders); + boolean continueProcessing = tradePair.isPresent(); - tradePair.ifPresent(orderOrderTradePair -> tradeExecutor.executeTrade(waitingOrders, orderOrderTradePair, ticker)); + while (continueProcessing) { + try { + tradeExecutor.executeTrade(waitingOrders, tradePair.get(), ticker); + tradePair = tradeMatcher.matchOrders(waitingOrders); + continueProcessing = tradePair.isPresent(); + } catch (Exception e) { + // TODO : 회복 필요 + log.error("Error processing trades for {}: {}", ticker, e.getMessage()); + continueProcessing = false; + } + } } } diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeMatcher.java b/src/main/java/com/cleanengine/coin/trade/application/TradeMatcher.java index 11bfa6ba..0fe3e3e4 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeMatcher.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeMatcher.java @@ -5,7 +5,6 @@ import com.cleanengine.coin.order.domain.OrderType; import com.cleanengine.coin.order.domain.SellOrder; import com.cleanengine.coin.order.domain.spi.WaitingOrders; -import com.cleanengine.coin.order.domain.spi.WaitingOrdersManager; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -15,20 +14,10 @@ @Component public class TradeMatcher { - private final WaitingOrdersManager waitingOrdersManager; - // 1초마다 로깅 private long lastLogTime = 0; private static final long LOG_INTERVAL = 1000; - public TradeMatcher(WaitingOrdersManager waitingOrdersManager) { - this.waitingOrdersManager = waitingOrdersManager; - } - - public WaitingOrders getWaitingOrders(String ticker) { - return waitingOrdersManager.getWaitingOrders(ticker); - } - public Optional> matchOrders(WaitingOrders waitingOrders) { // 반환값 : 체결여부 this.writeQueueLog(waitingOrders); diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeQueueManager.java b/src/main/java/com/cleanengine/coin/trade/application/TradeQueueManager.java index 46d84d26..0b198e55 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeQueueManager.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeQueueManager.java @@ -1,6 +1,5 @@ package com.cleanengine.coin.trade.application; -import com.cleanengine.coin.order.application.event.OrderCreated; import com.cleanengine.coin.order.application.event.OrderInsertedToQueue; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; @@ -19,12 +18,9 @@ public TradeQueueManager(TradeFlowService tradeFlowService) { } @EventListener - public void handleOrderInserted(OrderInsertedToQueue event) { - try { - tradeFlowService.execMatchAndTrade(event.order().getTicker()); - } catch (Exception e) { - log.error("Error processing trades for {}: {}", event.order().getTicker(), e.getMessage()); - } + public void handleOrderInserted(OrderInsertedToQueue orderInsertedToQueue) { + String ticker = orderInsertedToQueue.order().getTicker(); + tradeFlowService.execMatchAndTrade(ticker); } } \ No newline at end of file diff --git a/src/test/java/com/cleanengine/coin/trade/application/TradeFlowServiceTest.java b/src/test/java/com/cleanengine/coin/trade/application/TradeFlowServiceIntegrationTest.java similarity index 96% rename from src/test/java/com/cleanengine/coin/trade/application/TradeFlowServiceTest.java rename to src/test/java/com/cleanengine/coin/trade/application/TradeFlowServiceIntegrationTest.java index f3ad8b87..fd75eb3a 100644 --- a/src/test/java/com/cleanengine/coin/trade/application/TradeFlowServiceTest.java +++ b/src/test/java/com/cleanengine/coin/trade/application/TradeFlowServiceIntegrationTest.java @@ -23,9 +23,9 @@ @ActiveProfiles({"dev", "it", "h2-mem"}) @SpringBootTest -@DisplayName("체결 처리 통합테스트") +@DisplayName("체결처리 h2 통합테스트") @Disabled -class TradeFlowServiceTest { +class TradeFlowServiceIntegrationTest { private static TradeBatchProcessor staticTradeBatchProcessor; @@ -41,10 +41,9 @@ class TradeFlowServiceTest { TradeBatchProcessor tradeBatchProcessor; @Autowired private WaitingOrdersManager waitingOrdersManager; - @Autowired - TradeMatcher tradeMatcher; private final String ticker = "BTC"; + private final WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker); @BeforeEach void setUp() { @@ -76,7 +75,6 @@ void testLimitToLimitCompleteTrade() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -122,7 +120,6 @@ void testLimitToLimitPartialTrade1() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -161,7 +158,6 @@ void testLimitToLimitPartialTrade2() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -200,7 +196,6 @@ void testMarketToLimitCompleteTrade1() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -239,7 +234,6 @@ void testMarketToLimitCompleteTrade2() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -278,7 +272,6 @@ void testMarketToLimitPartialTrade1() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -317,7 +310,6 @@ void testMarketToLimitPartialTrade2() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -361,7 +353,6 @@ void testMarketToLimitPartialTrade3() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder); @@ -400,7 +391,6 @@ void testMarketToLimitPartialTrade4() { buyOrderRepository.save(buyOrder); sellOrderRepository.save(sellOrder); - WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker); waitingOrders.addOrder(buyOrder); waitingOrders.addOrder(sellOrder);