diff --git a/build.gradle b/build.gradle index f01680fb..00228524 100644 --- a/build.gradle +++ b/build.gradle @@ -77,6 +77,13 @@ dependencies { testRuntimeOnly 'org.junit.platform:junit-platform-launcher' testImplementation 'org.junit.platform:junit-platform-suite:1.10.0' + + // QueryDSL + implementation 'com.querydsl:querydsl-jpa:5.1.0:jakarta' + annotationProcessor 'com.querydsl:querydsl-apt:5.1.0:jakarta' + annotationProcessor 'jakarta.persistence:jakarta.persistence-api' + annotationProcessor 'jakarta.annotation:jakarta.annotation-api' + // Spring Security + OAuth2 implementation 'org.springframework.boot:spring-boot-starter-security' implementation 'org.springframework.boot:spring-boot-starter-oauth2-client' diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandler.java b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandler.java index 8d94a666..7583e2e6 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandler.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandler.java @@ -1,6 +1,6 @@ package com.cleanengine.coin.trade.application; -import com.cleanengine.coin.trade.entity.Trade; +import com.cleanengine.coin.order.domain.Order; import lombok.extern.slf4j.Slf4j; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Component; @@ -23,30 +23,23 @@ public TradeExecutedNotificationHandler(SimpMessagingTemplate messagingTemplate) } @TransactionalEventListener - public void notifyAfterTradeExecuted(TradeExecutedEvent tradeExecutedEvent) { - Trade trade = tradeExecutedEvent.getTrade(); - if (trade == null) { - log.error("체결 알림 실패! trade == null"); + public void notifyAfterTradeExecuted(TradeOrderCompletedEvent tradeOrderCompletedEvent) { + // TODO : 평균단가는 별도 계산해야 함 + Order order = tradeOrderCompletedEvent.getOrder(); + if (order == null) { + log.error("체결 알림 실패! order == null"); return ; } - Integer sellUserId = trade.getSellUserId(); - Integer buyUserId = trade.getBuyUserId(); - if (sellUserId == null || buyUserId == null) { - log.error("체결 알림 실패! sellUserId: {}, buyUserId: {}", sellUserId, buyUserId); + Integer userId = order.getUserId(); + if (userId == null) { + log.error("체결 알림 실패! userId: {}", userId); return ; } - if (sellUserId != SELL_ORDER_BOT_ID) { - TradeExecutedNotifyDto soldDto = TradeExecutedNotifyDto.of(trade, ASK); - messagingTemplate.convertAndSend("/topic/tradeNotification/" + sellUserId, soldDto); - } - if (buyUserId != BUY_ORDER_BOT_ID) { - TradeExecutedNotifyDto boughtDto = TradeExecutedNotifyDto.of(trade, BID); - messagingTemplate.convertAndSend("/topic/tradeNotification/" + buyUserId, boughtDto); - } - if (sellUserId != SELL_ORDER_BOT_ID || buyUserId != BUY_ORDER_BOT_ID) { - log.debug("{} 체결 이벤트 구독 : {}원에 {}개, 매수인: {}, 매도인: {}", trade.getTicker(), trade.getPrice(), trade.getSize(), buyUserId, sellUserId ); + if (userId != SELL_ORDER_BOT_ID && userId != BUY_ORDER_BOT_ID) { + TradeOrderCompletedNotifyDto notifyDto = TradeOrderCompletedNotifyDto.of(order); + messagingTemplate.convertAndSend("/topic/tradeNotification/" + userId, notifyDto); } } diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutedNotifyDto.java b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutedNotifyDto.java deleted file mode 100644 index 10585a79..00000000 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutedNotifyDto.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.cleanengine.coin.trade.application; - -import com.cleanengine.coin.trade.entity.Trade; -import com.fasterxml.jackson.annotation.JsonPropertyOrder; -import lombok.Builder; -import lombok.Getter; - -import java.time.LocalDateTime; - -@Getter -@JsonPropertyOrder({"ticker", "price", "size", "type", "tradedTime"}) -public class TradeExecutedNotifyDto { - - private String ticker; - - private Double price; - - private Double size; - - private String type; - - private LocalDateTime tradedTime; - - @Builder - private TradeExecutedNotifyDto(String ticker, Double price, Double size, String type, LocalDateTime tradedTime) { - this.ticker = ticker; - this.price = price; - this.size = size; - this.type = type; - this.tradedTime = tradedTime; - } - - public static TradeExecutedNotifyDto of(Trade trade, String type) { - return TradeExecutedNotifyDto.builder() - .ticker(trade.getTicker()) - .price(trade.getPrice()) - .size(trade.getSize()) - .type(type) - .tradedTime(trade.getTradeTime()) - .build(); - } - -} diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java index b8e4ca14..9b86a714 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeExecutor.java @@ -1,15 +1,11 @@ package com.cleanengine.coin.trade.application; -import com.cleanengine.coin.common.error.BusinessException; -import com.cleanengine.coin.common.response.ErrorStatus; import com.cleanengine.coin.order.domain.BuyOrder; import com.cleanengine.coin.order.domain.Order; import com.cleanengine.coin.order.domain.OrderStatus; import com.cleanengine.coin.order.domain.SellOrder; import com.cleanengine.coin.order.domain.spi.WaitingOrders; import com.cleanengine.coin.trade.entity.Trade; -import com.cleanengine.coin.user.domain.Account; -import com.cleanengine.coin.user.domain.Wallet; import com.cleanengine.coin.user.info.application.AccountService; import com.cleanengine.coin.user.info.application.WalletService; import lombok.Getter; @@ -33,10 +29,11 @@ public class TradeExecutor { private final AccountService accountService; @Getter private final TradeExecutedEventPublisher tradeExecutedEventPublisher; + private final TradeOrderCompletedEventPublisher tradeOrderCompletedEventPublisher; private final TradeService tradeService; @Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED) - public void executeTrade(WaitingOrders waitingOrders, TradePair tradePair, String ticker) { + public Trade executeTrade(WaitingOrders waitingOrders, TradePair tradePair, String ticker) { BuyOrder buyOrder = tradePair.getBuyOrder(); SellOrder sellOrder = tradePair.getSellOrder(); log.trace("{} - 체결 시작: 매수[{} {}원 {}개] / 매도[{} {}원 {}개]", ticker, buyOrder.getId(), buyOrder.getPrice(), buyOrder.getRemainingSize(), @@ -64,8 +61,7 @@ public void executeTrade(WaitingOrders waitingOrders, TradePair tr sellOrder.decreaseRemainingSize(tradedSize); // 주문 완전체결 처리(잔여금액 or 잔여수량이 0) - removeCompletedBuyOrder(waitingOrders, buyOrder); - removeCompletedSellOrder(waitingOrders, sellOrder); + removeCompletedOrders(waitingOrders, buyOrder, sellOrder); tradeService.updateOrder(buyOrder); tradeService.updateOrder(sellOrder); @@ -82,14 +78,14 @@ public void executeTrade(WaitingOrders waitingOrders, TradePair tr } // 지갑 누적계산 - this.updateWalletAfterTrade(buyOrder, ticker, tradedSize, totalTradedPrice); - this.updateWalletAfterTrade(sellOrder, ticker, tradedSize, totalTradedPrice); + walletService.updateWalletAfterTrade(buyOrder, ticker, tradedSize, totalTradedPrice); // 체결내역 저장 - Trade trade = this.insertNewTrade(ticker, buyOrder, sellOrder, tradedSize, tradedPrice); + Trade trade = Trade.of(ticker, LocalDateTime.now(), buyOrder.getUserId(), sellOrder.getUserId(), tradedPrice, tradedSize); TradeExecutedEvent tradeExecutedEvent = TradeExecutedEvent.of(trade, buyOrder.getId(), sellOrder.getId()); tradeExecutedEventPublisher.publish(tradeExecutedEvent); + return trade; } private static void checkZeroOrderAndThrowException(BuyOrder buyOrder, SellOrder sellOrder) { @@ -105,35 +101,13 @@ else if (approxEquals(sellOrder.getRemainingSize(), 0.0)) } private void increaseAccountCash(Order order, Double amount) { - Account account = accountService.findAccountByUserId(order.getUserId()).orElseThrow(); - accountService.save(account.increaseCash(amount)); - } + int updatedRows = accountService.increaseAccountCash(order.getUserId(), amount); - private void updateWalletAfterTrade(Order order, String ticker, double tradedSize, double totalTradedPrice) { - if (order instanceof BuyOrder) { - Wallet buyerWallet = walletService.findWalletByUserIdAndTicker(order.getUserId(), ticker); - double updatedBuySize = buyerWallet.getSize() + tradedSize; - double currentBuyPrice = buyerWallet.getBuyPrice() == null ? 0.0 : buyerWallet.getBuyPrice(); - double updatedBuyPrice = ((currentBuyPrice * buyerWallet.getSize()) + totalTradedPrice) / updatedBuySize; - buyerWallet.setSize(updatedBuySize); - buyerWallet.setBuyPrice(updatedBuyPrice); - // TODO : ROI 계산 - walletService.save(buyerWallet); - } else if (order instanceof SellOrder) { - // 매도 시에는 평단가 변동 없음 - Wallet sellerWallet = walletService.findWalletByUserIdAndTicker(order.getUserId(), ticker); - walletService.save(sellerWallet); - } else { - throw new BusinessException("Unsupported order type: " + order.getClass().getName(), ErrorStatus.INTERNAL_SERVER_ERROR); + if (updatedRows == 0) { + throw new RuntimeException("account updatedRows == 0"); } } - private Trade insertNewTrade(String ticker, BuyOrder buyOrder, SellOrder sellOrder, double tradeSize, Double tradePrice) { - Trade newTrade = Trade.of(ticker, LocalDateTime.now(), buyOrder.getUserId(), sellOrder.getUserId(), tradePrice, tradeSize); - - return tradeService.save(newTrade); - } - private static TradeUnitPriceAndSize getTradeUnitPriceAndSize(BuyOrder buyOrder, SellOrder sellOrder) { double tradedPrice; double tradedSize; @@ -180,25 +154,32 @@ private static void writeTradingLog(BuyOrder buyOrder, SellOrder sellOrder) { sellOrder.getRemainingSize()); } - private static void removeCompletedBuyOrder(WaitingOrders waitingOrders, BuyOrder order) { - boolean isOrderCompleted = (isMarketOrder(order) && approxEquals(order.getRemainingDeposit(), 0.0)) || - (isLimitOrder(order) && approxEquals(order.getRemainingSize(), 0.0)); - - if (isOrderCompleted) { - waitingOrders.removeOrder(order); - updateCompletedOrderStatus(order); - } + private void removeCompletedOrders(WaitingOrders waitingOrders, BuyOrder buyOrder, SellOrder sellOrder) { + removeCompletedOrder(waitingOrders, buyOrder); + removeCompletedOrder(waitingOrders, sellOrder); } - private static void removeCompletedSellOrder(WaitingOrders waitingOrders, SellOrder order) { - boolean isOrderCompleted = approxEquals(order.getRemainingSize(), 0.0); + private void removeCompletedOrder(WaitingOrders waitingOrders, Order order) { + boolean isOrderCompleted = false; + + if (order instanceof BuyOrder buyOrder) { + isOrderCompleted = (isMarketOrder(buyOrder) && approxEquals(buyOrder.getRemainingDeposit(), 0.0)) || + (isLimitOrder(buyOrder) && approxEquals(buyOrder.getRemainingSize(), 0.0)); + } else if (order instanceof SellOrder sellOrder) { + isOrderCompleted = approxEquals(sellOrder.getRemainingSize(), 0.0); + } if (isOrderCompleted) { waitingOrders.removeOrder(order); updateCompletedOrderStatus(order); + publishOrderCompletionEvent(order); } } + private void publishOrderCompletionEvent(Order order) { + tradeOrderCompletedEventPublisher.publish(TradeOrderCompletedEventImpl.of(order)); + } + private static void updateCompletedOrderStatus(Order order) { order.setState(OrderStatus.DONE); } diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java b/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java index f0880bbb..6ebf2fa7 100644 --- a/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeFlowService.java @@ -4,11 +4,17 @@ import com.cleanengine.coin.order.domain.Order; import com.cleanengine.coin.order.domain.spi.WaitingOrders; import com.cleanengine.coin.order.domain.spi.WaitingOrdersManager; +import com.cleanengine.coin.trade.entity.Trade; +import com.cleanengine.coin.trade.repository.TradeRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; +import java.util.concurrent.CountDownLatch; @Slf4j @RequiredArgsConstructor @@ -18,16 +24,31 @@ public class TradeFlowService { private final TradeMatcher tradeMatcher; private final TradeExecutor tradeExecutor; private final WaitingOrdersManager waitingOrdersManager; + private final TradeRepository tradeRepository; + + private CountDownLatch testLatch; // 테스트용 후크 + + @Profile("trade-load-test") + public void setTestLatch(CountDownLatch latch) { + this.testLatch = latch; + } public void execMatchAndTrade(String ticker) { WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker); // TODO : peek() 해온 Order 객체들을 lock -> 체결 도중 취소 방지 Optional> tradePair = tradeMatcher.matchOrders(waitingOrders); boolean continueProcessing = tradePair.isPresent(); + List tradesToSave = new ArrayList<>(); while (continueProcessing) { try { - tradeExecutor.executeTrade(waitingOrders, tradePair.get(), ticker); + Trade trade = tradeExecutor.executeTrade(waitingOrders, tradePair.get(), ticker); + tradesToSave.add(trade); + if (tradesToSave.size() > 10000) { + tradeRepository.saveAll(tradesToSave); + tradesToSave.clear(); + } + tradePair = tradeMatcher.matchOrders(waitingOrders); continueProcessing = tradePair.isPresent(); } catch (TradeZeroOrderException e) { @@ -41,6 +62,15 @@ public void execMatchAndTrade(String ticker) { continueProcessing = false; } } + + if (!tradesToSave.isEmpty()) { + tradeRepository.saveAll(tradesToSave); + tradesToSave.clear(); + } + + if (testLatch != null) { + testLatch.countDown(); + } } } diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEvent.java b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEvent.java new file mode 100644 index 00000000..02f25821 --- /dev/null +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEvent.java @@ -0,0 +1,9 @@ +package com.cleanengine.coin.trade.application; + +import com.cleanengine.coin.order.domain.Order; + +public interface TradeOrderCompletedEvent { + + Order getOrder(); + +} diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEventImpl.java b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEventImpl.java new file mode 100644 index 00000000..be72ae38 --- /dev/null +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEventImpl.java @@ -0,0 +1,22 @@ +package com.cleanengine.coin.trade.application; + +import com.cleanengine.coin.order.domain.Order; +import com.cleanengine.coin.trade.entity.Trade; +import lombok.Builder; +import lombok.Getter; + +@Getter +@Builder +public class TradeOrderCompletedEventImpl implements TradeOrderCompletedEvent { + + Order order; + + private TradeOrderCompletedEventImpl(Order order) { + this.order = order; + } + + public static TradeOrderCompletedEventImpl of(Order order) { + return new TradeOrderCompletedEventImpl(order); + } + +} diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEventPublisher.java b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEventPublisher.java new file mode 100644 index 00000000..fa8e6a93 --- /dev/null +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedEventPublisher.java @@ -0,0 +1,19 @@ +package com.cleanengine.coin.trade.application; + +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.stereotype.Service; + +@Service +public class TradeOrderCompletedEventPublisher { + + private final ApplicationEventPublisher publisher; + + public TradeOrderCompletedEventPublisher(ApplicationEventPublisher publisher) { + this.publisher = publisher; + } + + public void publish(TradeOrderCompletedEvent event) { + publisher.publishEvent(event); + } + +} diff --git a/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedNotifyDto.java b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedNotifyDto.java new file mode 100644 index 00000000..9dc1e49c --- /dev/null +++ b/src/main/java/com/cleanengine/coin/trade/application/TradeOrderCompletedNotifyDto.java @@ -0,0 +1,37 @@ +package com.cleanengine.coin.trade.application; + +import com.cleanengine.coin.order.domain.BuyOrder; +import com.cleanengine.coin.order.domain.Order; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import lombok.Builder; +import lombok.Getter; + +import java.time.LocalDateTime; + +@Getter +@JsonPropertyOrder({"ticker", "size", "type"}) +public class TradeOrderCompletedNotifyDto { + + private String ticker; + + private Double size; + + private String type; + + @Builder + private TradeOrderCompletedNotifyDto(String ticker, Double price, Double size, String type, LocalDateTime tradedTime) { + this.ticker = ticker; + this.size = size; + this.type = type; + } + + public static TradeOrderCompletedNotifyDto of(Order order) { + String orderType = order instanceof BuyOrder ? "ASK" : "BID"; + return TradeOrderCompletedNotifyDto.builder() + .ticker(order.getTicker()) + .size(order.getOrderSize()) + .type(orderType) + .build(); + } + +} diff --git a/src/main/java/com/cleanengine/coin/user/info/application/AccountService.java b/src/main/java/com/cleanengine/coin/user/info/application/AccountService.java index af743b5d..af3ddf6e 100644 --- a/src/main/java/com/cleanengine/coin/user/info/application/AccountService.java +++ b/src/main/java/com/cleanengine/coin/user/info/application/AccountService.java @@ -55,4 +55,9 @@ public void resetBot(String ticker) { walletRepository.save(wallet); walletRepository.save(wallet2); } + + public int increaseAccountCash(int userId, double amount) { + return accountRepository.increaseAccountCash(userId, amount); + } + } diff --git a/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java b/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java index 10fbcef3..16867f18 100644 --- a/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java +++ b/src/main/java/com/cleanengine/coin/user/info/application/WalletService.java @@ -1,9 +1,14 @@ package com.cleanengine.coin.user.info.application; import com.cleanengine.coin.order.adapter.out.persistentce.asset.AssetRepository; +import com.cleanengine.coin.order.domain.BuyOrder; +import com.cleanengine.coin.user.domain.QAccount; +import com.cleanengine.coin.user.domain.QWallet; import com.cleanengine.coin.user.domain.Wallet; import com.cleanengine.coin.user.info.infra.AccountRepository; import com.cleanengine.coin.user.info.infra.WalletRepository; +import com.querydsl.jpa.impl.JPAQueryFactory; +import jakarta.persistence.EntityManager; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -15,11 +20,14 @@ public class WalletService { private final WalletRepository walletRepository; private final AccountRepository accountRepository; private final AssetRepository assetRepository; + private final JPAQueryFactory queryFactory; - public WalletService(WalletRepository walletRepository, AccountRepository accountRepository, AssetRepository assetRepository) { + public WalletService(WalletRepository walletRepository, AccountRepository accountRepository, + AssetRepository assetRepository, EntityManager entityManager) { this.walletRepository = walletRepository; this.accountRepository = accountRepository; this.assetRepository = assetRepository; + this.queryFactory = new JPAQueryFactory(entityManager); } @Transactional @@ -31,10 +39,13 @@ public Wallet save(Wallet wallet) { return walletRepository.save(wallet); } + public List saveAll(List wallets) { + return walletRepository.saveAll(wallets); + } + public Wallet findWalletByUserIdAndTicker(Integer userId, String ticker) { - int accountId = accountRepository.findByUserId(userId).orElseThrow().getId(); - return walletRepository.findByAccountIdAndTicker(accountId, ticker) - .orElseGet(() -> Wallet.of(ticker, accountId)); + return walletRepository.findByUserIdAndTicker(userId, ticker) + .orElseGet(() -> Wallet.of(ticker, accountRepository.findByUserId(userId).orElseThrow().getId())); } public void createNewWallets(Integer accountId) { @@ -44,4 +55,27 @@ public void createNewWallets(Integer accountId) { .forEach(walletRepository::save); } + @Transactional + public void updateWalletAfterTrade(BuyOrder buyOrder, String ticker, double tradedSize, double totalTradedPrice) { + QWallet wallet = QWallet.wallet; + QAccount account = QAccount.account; + + queryFactory + .update(wallet) + .where(wallet.accountId.eq( + queryFactory + .select(account.id) + .from(account) + .where(account.userId.eq(buyOrder.getUserId())) + ).and(wallet.ticker.eq(ticker))) + .set(wallet.size, wallet.size.add(tradedSize)) + .set(wallet.buyPrice, + wallet.buyPrice.coalesce(0.0) + .multiply(wallet.size) + .add(totalTradedPrice) + .divide(wallet.size.add(tradedSize)) + ) + .execute(); + } + } diff --git a/src/main/java/com/cleanengine/coin/user/info/infra/AccountRepository.java b/src/main/java/com/cleanengine/coin/user/info/infra/AccountRepository.java index 4c337cc5..05b056e3 100644 --- a/src/main/java/com/cleanengine/coin/user/info/infra/AccountRepository.java +++ b/src/main/java/com/cleanengine/coin/user/info/infra/AccountRepository.java @@ -4,6 +4,8 @@ import jakarta.persistence.LockModeType; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Lock; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; import java.util.Optional; @@ -12,4 +14,8 @@ public interface AccountRepository extends JpaRepository { @Lock(LockModeType.PESSIMISTIC_WRITE) Optional findByUserId(Integer userId); + @Modifying(flushAutomatically = true) + @Query("UPDATE Account a SET a.cash = a.cash + :amount WHERE a.userId = :userId") + int increaseAccountCash(int userId, double amount); + } diff --git a/src/main/java/com/cleanengine/coin/user/info/infra/WalletRepository.java b/src/main/java/com/cleanengine/coin/user/info/infra/WalletRepository.java index 25bf9a9b..da77c883 100644 --- a/src/main/java/com/cleanengine/coin/user/info/infra/WalletRepository.java +++ b/src/main/java/com/cleanengine/coin/user/info/infra/WalletRepository.java @@ -4,14 +4,25 @@ import jakarta.persistence.LockModeType; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Lock; +import org.springframework.data.jpa.repository.Query; import java.util.List; import java.util.Optional; public interface WalletRepository extends JpaRepository { + @Lock(LockModeType.PESSIMISTIC_WRITE) Optional findByAccountIdAndTicker(Integer accountId, String ticker); @Lock(LockModeType.PESSIMISTIC_WRITE) List findByAccountId(Integer accountId); + + @Lock(LockModeType.PESSIMISTIC_WRITE) + @Query("SELECT w FROM Wallet w JOIN Account a ON w.accountId = a.id WHERE a.userId = :userId AND w.ticker = :ticker") + Optional findByUserIdAndTicker(Integer userId, String ticker); + + @Lock(LockModeType.PESSIMISTIC_WRITE) + @Query("SELECT w, a.userId FROM Wallet w JOIN Account a ON w.accountId = a.id WHERE a.userId IN :userIds AND w.ticker = :ticker") + List findAllByUserIdsAndTicker(List userIds, String ticker); + } diff --git a/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java b/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java index 733a3a38..74de0550 100644 --- a/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java +++ b/src/test/java/com/cleanengine/coin/trade/application/TradeExecuteLoadTest.java @@ -19,11 +19,12 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -@ActiveProfiles({"dev", "it", "h2-mem"}) +import static org.assertj.core.api.Assertions.assertThat; + +@ActiveProfiles({"dev", "it", "mariadb-local", "trade-load-test", "actuator", "apm", "otel-local"}) @Disabled @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @SpringBootTest @@ -38,6 +39,9 @@ class TradeExecuteLoadTest { @Autowired TradeRepository tradeRepository; + @Autowired + private TradeFlowService tradeFlowService; + @Autowired SellOrderRepository sellOrderRepository; @@ -49,10 +53,22 @@ class TradeExecuteLoadTest { @DisplayName("워밍업: Spring 컨텍스트 및 JVM 최적화") @Order(1) @Test +// @Disabled void warmUp() throws InterruptedException { - runSingleTest(1000); - runSingleTest(10000); - runSingleTest(50000); + System.out.println("Starting warmUp"); + int warmUpCount1 = 10; + for (int i = 0; i < warmUpCount1; i++) { + runSingleTest(1000); + } +// int warmUpCount2 = 5; +// for (int i = 0; i < warmUpCount2; i++) { +// runSingleTest(10000); +// } +// int warmUpCount3 = 5; +// for (int i = 0; i < warmUpCount3; i++) { +// runSingleTest(100000); +// } + System.out.println("Finished warmUp"); } @BeforeEach @@ -62,10 +78,13 @@ void setUp() { tradeRepository.deleteAll(); sellOrderRepository.deleteAll(); buyOrderRepository.deleteAll(); + tradeFlowService.setTestLatch(null); } + @DisplayName("매수, 매도 각 1000건에 대한 처리 성능을 10회 진행한다.") @Order(2) @Test +// @Disabled void basicLoadTestWith1000OrdersEachSide() throws InterruptedException { // given int orderCount = 1000; @@ -83,11 +102,19 @@ void basicLoadTestWith1000OrdersEachSide() throws InterruptedException { // 통계 출력 printStatistics(queueInsertTimes, executionTimes, orderCount); + + WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker); + PriorityQueueStore buyOrderPriorityQueueStore = waitingOrders.getBuyOrderPriorityQueueStore(OrderType.LIMIT); + PriorityQueueStore sellOrderPriorityQueueStore = waitingOrders.getSellOrderPriorityQueueStore(OrderType.LIMIT); + + assertThat(sellOrderPriorityQueueStore.size()).isEqualTo(0); + assertThat(buyOrderPriorityQueueStore.size()).isEqualTo(0); } @DisplayName("매수, 매도 각 10000건에 대한 처리 성능을 10회 진행한다.") @Order(3) @Test + @Disabled void basicLoadTestWith10000OrdersEachSide() throws InterruptedException { // given int orderCount = 10000; @@ -105,11 +132,19 @@ void basicLoadTestWith10000OrdersEachSide() throws InterruptedException { // 통계 출력 printStatistics(queueInsertTimes, executionTimes, orderCount); + + WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker); + PriorityQueueStore buyOrderPriorityQueueStore = waitingOrders.getBuyOrderPriorityQueueStore(OrderType.LIMIT); + PriorityQueueStore sellOrderPriorityQueueStore = waitingOrders.getSellOrderPriorityQueueStore(OrderType.LIMIT); + + assertThat(sellOrderPriorityQueueStore.size()).isEqualTo(0); + assertThat(buyOrderPriorityQueueStore.size()).isEqualTo(0); } @DisplayName("매수, 매도 각 100000건에 대한 처리 성능을 10회 진행한다.") @Order(4) @Test + @Disabled void basicLoadTestWith100000OrdersEachSide() throws InterruptedException { // given int orderCount = 100000; @@ -131,47 +166,60 @@ void basicLoadTestWith100000OrdersEachSide() throws InterruptedException { private long[] runSingleTest(int orderCount) throws InterruptedException { WaitingOrders waitingOrders = waitingOrdersManager.getWaitingOrders(ticker); + + // 큐와 DB 초기화 + waitingOrders.clearAllQueues(); + tradeRepository.deleteAll(); + sellOrderRepository.deleteAll(); + buyOrderRepository.deleteAll(); + + CountDownLatch latch = new CountDownLatch(1); + tradeFlowService.setTestLatch(latch); + long testStart = System.nanoTime(); // 주문 생성 및 큐 삽입 - ExecutorService executor = Executors.newFixedThreadPool(10); + final LocalDateTime baseTime = LocalDateTime.now(); + for (int i = 0; i < orderCount; i++) { - executor.submit(() -> { - SellOrder limitSellOrder = SellOrder.createLimitSellOrder(ticker, 1, 10.0, 130_000_000.0, LocalDateTime.now(), true); - BuyOrder limitBuyOrder = BuyOrder.createLimitBuyOrder(ticker, 2, 10.0, 130_000_000.0, LocalDateTime.now(), true); - waitingOrders.addOrder(limitSellOrder); - waitingOrders.addOrder(limitBuyOrder); - }); + final LocalDateTime orderTime = baseTime.plusSeconds(i); + + SellOrder limitSellOrder = SellOrder.createLimitSellOrder(ticker, 1, 10.0, 130_000_000.0, orderTime, true); + BuyOrder limitBuyOrder = BuyOrder.createLimitBuyOrder(ticker, 2, 10.0, 130_000_000.0, orderTime, true); + waitingOrders.addOrder(limitSellOrder); + waitingOrders.addOrder(limitBuyOrder); + + sellOrderRepository.save(limitSellOrder); + buyOrderRepository.save(limitBuyOrder); } - // 큐 삽입 완료 대기 - executor.shutdown(); - boolean queueTerminated = executor.awaitTermination(10, TimeUnit.SECONDS); + PriorityQueueStore buyOrderPriorityQueueStore = waitingOrders.getBuyOrderPriorityQueueStore(OrderType.LIMIT); + PriorityQueueStore sellOrderPriorityQueueStore = waitingOrders.getSellOrderPriorityQueueStore(OrderType.LIMIT); + long queueInsertEnd = System.nanoTime(); long queueInsertTime = (queueInsertEnd - testStart) / 1_000_000; - // 단일 이벤트 발행 (체결 시작) + // 단일 이벤트 발행 (체결 시작, 큐에는 안 넣음) long eventStart = System.nanoTime(); SellOrder dummyOrder = SellOrder.createLimitSellOrder(ticker, 1, 10.0, 130_000_000.0, LocalDateTime.now(), true); eventPublisher.publishEvent(new OrderInsertedToQueue(dummyOrder)); + + boolean completed = latch.await(2, TimeUnit.MINUTES); long eventEnd = System.nanoTime(); long executionTime = (eventEnd - eventStart) / 1_000_000; + // CountDownLatch 초기화 + tradeFlowService.setTestLatch(null); + // 결과 출력 - if (tradeRepository.findAll().size() != orderCount) { - PriorityQueueStore buyOrderPriorityQueueStore = waitingOrders.getBuyOrderPriorityQueueStore(OrderType.LIMIT); - PriorityQueueStore sellOrderPriorityQueueStore = waitingOrders.getSellOrderPriorityQueueStore(OrderType.LIMIT); - System.out.print("체결 종료 - 체결내역[: " + tradeRepository.findAll().size() + "건]"); - System.out.println("잔여 주문[매도 " + sellOrderPriorityQueueStore.size() + "건, 매수 " + buyOrderPriorityQueueStore.size() + "건]"); + long tradeCount = tradeRepository.count(); + System.out.print("체결 종료 - 체결내역[: " + tradeCount + "건]"); + System.out.println("잔여 주문[매도 " + sellOrderPriorityQueueStore.size() + "건, 매수 " + buyOrderPriorityQueueStore.size() + "건]"); + if (tradeCount != orderCount || !completed) { + System.out.println("경고: 예상 체결 건수(" + orderCount + "건)와 실제(" + tradeCount + "건) 불일치 또는 타임아웃"); } - // 큐와 DB 초기화 - waitingOrders.clearAllQueues(); - tradeRepository.deleteAll(); - sellOrderRepository.deleteAll(); - buyOrderRepository.deleteAll(); - return new long[]{queueInsertTime, executionTime}; } diff --git a/src/test/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandlerTest.java b/src/test/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandlerTest.java index d3793153..059721ac 100644 --- a/src/test/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandlerTest.java +++ b/src/test/java/com/cleanengine/coin/trade/application/TradeExecutedNotificationHandlerTest.java @@ -6,7 +6,8 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; -import com.cleanengine.coin.trade.entity.Trade; +import com.cleanengine.coin.order.domain.BuyOrder; +import com.cleanengine.coin.order.domain.SellOrder; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -35,52 +36,38 @@ void setUp() { @Test void shouldSendNotificationsForValidTrade() { // given - Trade trade = Trade.of("BTC", LocalDateTime.now(), 3, SELL_ORDER_BOT_ID, 50000.0, 1.0); - TradeExecutedEvent event = TradeExecutedEvent.of(trade, null, null); + SellOrder sellOrder = SellOrder.createLimitSellOrder("BTC", 3, 5.0, 130_000_000.0, LocalDateTime.now(), false); + TradeOrderCompletedEvent event = TradeOrderCompletedEventImpl.of(sellOrder); // when handler.notifyAfterTradeExecuted(event); // then - verify(messagingTemplate, times(1)).convertAndSend(eq("/topic/tradeNotification/3"), any(TradeExecutedNotifyDto.class)); - verify(messagingTemplate).convertAndSend(eq("/topic/tradeNotification/3"), any(TradeExecutedNotifyDto.class)); + verify(messagingTemplate, times(1)).convertAndSend(eq("/topic/tradeNotification/3"), any(TradeOrderCompletedNotifyDto.class)); + verify(messagingTemplate).convertAndSend(eq("/topic/tradeNotification/3"), any(TradeOrderCompletedNotifyDto.class)); } @DisplayName("매수인은 봇인 정상 체결내역을 리스닝하면 웹소켓으로 전송한다.") @Test void shouldSendNotificationsForValidTrade2() { // given - Trade trade = Trade.of("BTC", LocalDateTime.now(), BUY_ORDER_BOT_ID, 3, 50000.0, 1.0); - TradeExecutedEvent event = TradeExecutedEvent.of(trade, null, null); + BuyOrder buyOrder = BuyOrder.createLimitBuyOrder("BTC", 4, 5.0, 130_000_000.0, LocalDateTime.now(), false); + TradeOrderCompletedEvent event = TradeOrderCompletedEventImpl.of(buyOrder); // when handler.notifyAfterTradeExecuted(event); // then - verify(messagingTemplate, times(1)).convertAndSend(eq("/topic/tradeNotification/3"), any(TradeExecutedNotifyDto.class)); - verify(messagingTemplate).convertAndSend(eq("/topic/tradeNotification/3"), any(TradeExecutedNotifyDto.class)); - } - - @DisplayName("매수인과 매도인의 userId가 null이면 메시지를 전송하지 않는다.") - @Test - void shouldNotSendNotificationForNullUserIds() { - // given - Trade trade = Trade.of("BTC", LocalDateTime.now(), null, null, 50000.0, 1.0); - TradeExecutedEvent event = TradeExecutedEvent.of(trade, null, null); - - // when - handler.notifyAfterTradeExecuted(event); - - // then - verifyNoInteractions(messagingTemplate); + verify(messagingTemplate, times(1)).convertAndSend(eq("/topic/tradeNotification/4"), any(TradeOrderCompletedNotifyDto.class)); + verify(messagingTemplate).convertAndSend(eq("/topic/tradeNotification/4"), any(TradeOrderCompletedNotifyDto.class)); } @DisplayName("매수인의 userId가 null이면 메시지를 전송하지 않는다.") @Test void shouldNotSendNotificationForNullBuyUserId() { // given - Trade trade = Trade.of("BTC", LocalDateTime.now(), null, SELL_ORDER_BOT_ID, 50000.0, 1.0); - TradeExecutedEvent event = TradeExecutedEvent.of(trade, null, null); + BuyOrder buyOrder = BuyOrder.createLimitBuyOrder("BTC", null, 5.0, 130_000_000.0, LocalDateTime.now(), false); + TradeOrderCompletedEvent event = TradeOrderCompletedEventImpl.of(buyOrder); // when handler.notifyAfterTradeExecuted(event); @@ -93,8 +80,8 @@ void shouldNotSendNotificationForNullBuyUserId() { @Test void shouldNotSendNotificationForNullSellUserId() { // given - Trade trade = Trade.of("BTC", LocalDateTime.now(), BUY_ORDER_BOT_ID, null, 50000.0, 1.0); - TradeExecutedEvent event = TradeExecutedEvent.of(trade, null, null); + SellOrder sellOrder = SellOrder.createLimitSellOrder("BTC", null, 5.0, 130_000_000.0, LocalDateTime.now(), false); + TradeOrderCompletedEvent event = TradeOrderCompletedEventImpl.of(sellOrder); // when handler.notifyAfterTradeExecuted(event); @@ -103,25 +90,28 @@ void shouldNotSendNotificationForNullSellUserId() { verifyNoInteractions(messagingTemplate); } - @DisplayName("봇끼리의 체결은 메시지를 전송하지 않는다.") + @DisplayName("봇의 체결은 메시지를 전송하지 않는다.") @Test void shouldNotSendNotificationForBotTrade() { // given - Trade trade = Trade.of("BTC", LocalDateTime.now(), BUY_ORDER_BOT_ID, SELL_ORDER_BOT_ID, 50000.0, 1.0); - TradeExecutedEvent event = TradeExecutedEvent.of(trade, null, null); + SellOrder sellOrder = SellOrder.createLimitSellOrder("BTC", SELL_ORDER_BOT_ID, 5.0, 130_000_000.0, LocalDateTime.now(), false); + BuyOrder buyOrder = BuyOrder.createLimitBuyOrder("BTC", BUY_ORDER_BOT_ID, 5.0, 130_000_000.0, LocalDateTime.now(), false); + TradeOrderCompletedEvent event = TradeOrderCompletedEventImpl.of(sellOrder); + TradeOrderCompletedEvent event2 = TradeOrderCompletedEventImpl.of(buyOrder); // when handler.notifyAfterTradeExecuted(event); + handler.notifyAfterTradeExecuted(event2); // then verifyNoInteractions(messagingTemplate); } - @DisplayName("체결이 null이면 메시지를 전송하지 않는다.") + @DisplayName("주문이 null이면 메시지를 전송하지 않는다.") @Test void shouldNotSendNotificationForNullTrade() { // given - TradeExecutedEvent event = TradeExecutedEvent.of(null, null, null); + TradeOrderCompletedEvent event = TradeOrderCompletedEventImpl.of(null); // when handler.notifyAfterTradeExecuted(event);