Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionalEventListener;

@Component
@RequiredArgsConstructor
Expand All @@ -25,7 +25,7 @@ public class TradeEventHandler {
private final RealTimeDataPrevRateService realTimeDataPrevRateService;
//event로 이벤틀 처리해야한다.
//eventListener는 void로 처리를 해야한다
@EventListener
@TransactionalEventListener
public void handleTradeEvent(TradeExecutedEvent event) {
Trade trade = event.getTrade();
if (trade == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
import com.cleanengine.coin.user.domain.Wallet;
import com.cleanengine.coin.user.info.application.AccountService;
import com.cleanengine.coin.user.info.application.WalletService;
import jakarta.transaction.Transactional;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;

Expand All @@ -33,7 +34,7 @@ public class TradeExecutor {
private final TradeExecutedEventPublisher tradeExecutedEventPublisher;
private final TradeService tradeService;

@Transactional
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void executeTrade(WaitingOrders waitingOrders, TradePair<Order, Order> tradePair, String ticker) {
BuyOrder buyOrder = tradePair.getBuyOrder();
SellOrder sellOrder = tradePair.getSellOrder();
Expand All @@ -48,7 +49,7 @@ public void executeTrade(WaitingOrders waitingOrders, TradePair<Order, Order> tr
tradedPrice = tradeUnitPriceAndSize.tradedPrice();
if (approxEquals(tradedSize, 0.0)) {
log.debug("체결 중단! 체결 시도 수량 : {}, 매수단가 : {}, 매도단가 : {}", tradedSize, buyOrder.getPrice(), sellOrder.getPrice());
return;
return ;
}
this.writeTradingLog(buyOrder, sellOrder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

import com.cleanengine.coin.order.domain.Order;
import com.cleanengine.coin.order.domain.spi.WaitingOrders;
import com.cleanengine.coin.order.domain.spi.WaitingOrdersManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.Optional;

Expand All @@ -17,14 +16,25 @@ public class TradeFlowService {

private final TradeMatcher tradeMatcher;
private final TradeExecutor tradeExecutor;
private final WaitingOrdersManager waitingOrdersManager;

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void execMatchAndTrade(String ticker) {
WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker);
WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker);
// TODO : peek() 해온 Order 객체들을 lock -> 체결 도중 취소 방지
Optional<TradePair<Order, Order>> tradePair = tradeMatcher.matchOrders(waitingOrders);
boolean continueProcessing = tradePair.isPresent();

tradePair.ifPresent(orderOrderTradePair -> tradeExecutor.executeTrade(waitingOrders, orderOrderTradePair, ticker));
while (continueProcessing) {
try {
tradeExecutor.executeTrade(waitingOrders, tradePair.get(), ticker);
tradePair = tradeMatcher.matchOrders(waitingOrders);
continueProcessing = tradePair.isPresent();
} catch (Exception e) {
// TODO : 회복 필요
log.error("Error processing trades for {}: {}", ticker, e.getMessage());
continueProcessing = false;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.cleanengine.coin.order.domain.OrderType;
import com.cleanengine.coin.order.domain.SellOrder;
import com.cleanengine.coin.order.domain.spi.WaitingOrders;
import com.cleanengine.coin.order.domain.spi.WaitingOrdersManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

Expand All @@ -15,20 +14,10 @@
@Component
public class TradeMatcher {

private final WaitingOrdersManager waitingOrdersManager;

// 1초마다 로깅
private long lastLogTime = 0;
private static final long LOG_INTERVAL = 1000;

public TradeMatcher(WaitingOrdersManager waitingOrdersManager) {
this.waitingOrdersManager = waitingOrdersManager;
}

public WaitingOrders getWaitingOrders(String ticker) {
return waitingOrdersManager.getWaitingOrders(ticker);
}

public Optional<TradePair<Order, Order>> matchOrders(WaitingOrders waitingOrders) { // 반환값 : 체결여부
this.writeQueueLog(waitingOrders);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.cleanengine.coin.trade.application;

import com.cleanengine.coin.order.application.event.OrderCreated;
import com.cleanengine.coin.order.application.event.OrderInsertedToQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
Expand All @@ -19,12 +18,9 @@ public TradeQueueManager(TradeFlowService tradeFlowService) {
}

@EventListener
public void handleOrderInserted(OrderInsertedToQueue event) {
try {
tradeFlowService.execMatchAndTrade(event.order().getTicker());
} catch (Exception e) {
log.error("Error processing trades for {}: {}", event.order().getTicker(), e.getMessage());
}
public void handleOrderInserted(OrderInsertedToQueue orderInsertedToQueue) {
String ticker = orderInsertedToQueue.order().getTicker();
tradeFlowService.execMatchAndTrade(ticker);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@

@ActiveProfiles({"dev", "it", "h2-mem"})
@SpringBootTest
@DisplayName("체결 처리 통합테스트")
@DisplayName("체결처리 h2 통합테스트")
@Disabled
class TradeFlowServiceTest {
class TradeFlowServiceIntegrationTest {

private static TradeBatchProcessor staticTradeBatchProcessor;

Expand All @@ -41,10 +41,9 @@ class TradeFlowServiceTest {
TradeBatchProcessor tradeBatchProcessor;
@Autowired
private WaitingOrdersManager waitingOrdersManager;
@Autowired
TradeMatcher tradeMatcher;

private final String ticker = "BTC";
private final WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker);

@BeforeEach
void setUp() {
Expand Down Expand Up @@ -76,7 +75,6 @@ void testLimitToLimitCompleteTrade() {
buyOrderRepository.save(buyOrder);
sellOrderRepository.save(sellOrder);

WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker);
waitingOrders.addOrder(buyOrder);
waitingOrders.addOrder(sellOrder);

Expand Down Expand Up @@ -122,7 +120,6 @@ void testLimitToLimitPartialTrade1() {
buyOrderRepository.save(buyOrder);
sellOrderRepository.save(sellOrder);

WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker);
waitingOrders.addOrder(buyOrder);
waitingOrders.addOrder(sellOrder);

Expand Down Expand Up @@ -161,7 +158,6 @@ void testLimitToLimitPartialTrade2() {
buyOrderRepository.save(buyOrder);
sellOrderRepository.save(sellOrder);

WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker);
waitingOrders.addOrder(buyOrder);
waitingOrders.addOrder(sellOrder);

Expand Down Expand Up @@ -200,7 +196,6 @@ void testMarketToLimitCompleteTrade1() {
buyOrderRepository.save(buyOrder);
sellOrderRepository.save(sellOrder);

WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker);
waitingOrders.addOrder(buyOrder);
waitingOrders.addOrder(sellOrder);

Expand Down Expand Up @@ -239,7 +234,6 @@ void testMarketToLimitCompleteTrade2() {
buyOrderRepository.save(buyOrder);
sellOrderRepository.save(sellOrder);

WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker);
waitingOrders.addOrder(buyOrder);
waitingOrders.addOrder(sellOrder);

Expand Down Expand Up @@ -278,7 +272,6 @@ void testMarketToLimitPartialTrade1() {
buyOrderRepository.save(buyOrder);
sellOrderRepository.save(sellOrder);

WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker);
waitingOrders.addOrder(buyOrder);
waitingOrders.addOrder(sellOrder);

Expand Down Expand Up @@ -317,7 +310,6 @@ void testMarketToLimitPartialTrade2() {
buyOrderRepository.save(buyOrder);
sellOrderRepository.save(sellOrder);

WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker);
waitingOrders.addOrder(buyOrder);
waitingOrders.addOrder(sellOrder);

Expand Down Expand Up @@ -361,7 +353,6 @@ void testMarketToLimitPartialTrade3() {
buyOrderRepository.save(buyOrder);
sellOrderRepository.save(sellOrder);

WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker);
waitingOrders.addOrder(buyOrder);
waitingOrders.addOrder(sellOrder);

Expand Down Expand Up @@ -400,7 +391,6 @@ void testMarketToLimitPartialTrade4() {
buyOrderRepository.save(buyOrder);
sellOrderRepository.save(sellOrder);

WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker);
waitingOrders.addOrder(buyOrder);
waitingOrders.addOrder(sellOrder);

Expand Down
Loading