Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@

import com.cleanengine.coin.order.application.dto.OrderCommand;
import com.cleanengine.coin.order.application.dto.OrderInfo;
import com.cleanengine.coin.common.error.BusinessException;
import com.cleanengine.coin.common.response.ErrorStatus;
import com.cleanengine.coin.order.adapter.out.persistentce.order.command.BuyOrderRepository;
import com.cleanengine.coin.order.adapter.out.persistentce.order.command.SellOrderRepository;
import com.cleanengine.coin.order.application.strategy.CreateOrderStrategy;
import com.cleanengine.coin.order.domain.BuyOrder;
import com.cleanengine.coin.order.domain.Order;
import com.cleanengine.coin.order.domain.SellOrder;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -39,4 +46,5 @@ public OrderInfo<?> createOrderWithBot(String ticker, Boolean isBuyOrder, Double

return createOrder(createOrder);
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package com.cleanengine.coin.trade.application;

import com.cleanengine.coin.chart.dto.TradeEventDto;
import com.cleanengine.coin.order.domain.spi.WaitingOrdersManager;
import com.cleanengine.coin.orderbook.application.service.UpdateOrderBookUsecase;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
Expand All @@ -21,28 +19,21 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Service
@Order(4)
@Slf4j
@RequiredArgsConstructor
@Service
public class TradeBatchProcessor implements ApplicationRunner {

Logger logger = LoggerFactory.getLogger(TradeBatchProcessor.class);

private final WaitingOrdersManager waitingOrdersManager;
private final TradeService tradeService;
private final TradeFlowService tradeFlowService;
private final List<ExecutorService> executors = new ArrayList<>();

@Getter
private final Map<String, TradeQueueManager> tradeQueueManagers = new HashMap<>();
private final UpdateOrderBookUsecase updateOrderBookUsecase;

@Value("${order.tickers}") String[] tickers;

public TradeBatchProcessor(WaitingOrdersManager waitingOrdersManager, TradeService tradeService, UpdateOrderBookUsecase updateOrderBookUsecase) {
this.waitingOrdersManager = waitingOrdersManager;
this.tradeService = tradeService;
this.updateOrderBookUsecase = updateOrderBookUsecase;
}

@Override
public void run(ApplicationArguments args) {
processTrades();
Expand All @@ -51,8 +42,7 @@ public void run(ApplicationArguments args) {
private void processTrades() {
for (String ticker : tickers) {
TradeQueueManager tradeQueueManager = new TradeQueueManager(waitingOrdersManager.getWaitingOrders(ticker),
updateOrderBookUsecase,
tradeService);
tradeFlowService);
tradeQueueManagers.put(ticker, tradeQueueManager); // 정상 종료를 위해 저장

ExecutorService tradeExecutor = Executors.newSingleThreadExecutor(r -> {
Expand All @@ -66,7 +56,7 @@ private void processTrades() {
try {
tradeQueueManager.run();
} catch (Exception e) {
logger.error("Error in trade loop for {}: {}",ticker, e.getMessage());
log.error("Error in trade loop for {}: {}",ticker, e.getMessage());
}
});
}
Expand All @@ -87,7 +77,7 @@ public void shutdown() {
executor.shutdownNow();
// 추가로 1초 더 대기
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
System.err.println("스레드풀이 완전히 종료되지 않았습니다");
log.error("스레드풀이 완전히 종료되지 않았습니다");
}
}
} catch (InterruptedException e) {
Expand All @@ -97,9 +87,4 @@ public void shutdown() {
}
}

@Deprecated
public TradeEventDto retrieveTradeEventDto(String ticker) {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,21 @@
@Getter
@Builder
public class TradeExecutedEvent {

Trade trade;

Long buyOrderId;

Long sellOrderId;

private TradeExecutedEvent(Trade trade, Long buyOrderId, Long sellOrderId) {
this.trade = trade;
this.buyOrderId = buyOrderId;
this.sellOrderId = sellOrderId;
}

public static TradeExecutedEvent of(Trade trade, Long buyOrderId, Long sellOrderId) {
return new TradeExecutedEvent(trade, buyOrderId, sellOrderId);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package com.cleanengine.coin.trade.application;

import com.cleanengine.coin.common.error.BusinessException;
import com.cleanengine.coin.common.response.ErrorStatus;
import com.cleanengine.coin.order.application.OrderService;
import com.cleanengine.coin.order.domain.BuyOrder;
import com.cleanengine.coin.order.domain.Order;
import com.cleanengine.coin.order.domain.OrderStatus;
import com.cleanengine.coin.order.domain.SellOrder;
import com.cleanengine.coin.order.domain.spi.WaitingOrders;
import com.cleanengine.coin.trade.entity.Trade;
import com.cleanengine.coin.user.domain.Account;
import com.cleanengine.coin.user.domain.Wallet;
import com.cleanengine.coin.user.info.application.AccountService;
import com.cleanengine.coin.user.info.application.WalletService;
import jakarta.transaction.Transactional;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

import static com.cleanengine.coin.common.CommonValues.approxEquals;

@Slf4j
@RequiredArgsConstructor
@Component
public class TradeExecutor {

private final WalletService walletService;
private final AccountService accountService;
@Getter
private final TradeExecutedEventPublisher tradeExecutedEventPublisher;
private final TradeService tradeService;

@Transactional
public void executeTrade(WaitingOrders waitingOrders, TradePair<Order, Order> tradePair, String ticker) {
BuyOrder buyOrder = tradePair.getBuyOrder();
SellOrder sellOrder = tradePair.getSellOrder();

double tradedPrice;
double tradedSize;
double totalTradedPrice;

// 체결 단가, 수량 확정
TradeUnitPriceAndSize tradeUnitPriceAndSize = getTradeUnitPriceAndSize(buyOrder, sellOrder);
tradedSize = tradeUnitPriceAndSize.tradedSize();
tradedPrice = tradeUnitPriceAndSize.tradedPrice();
if (approxEquals(tradedSize, 0.0)) {
log.debug("체결 중단! 체결 시도 수량 : {}, 매수단가 : {}, 매도단가 : {}", tradedSize, buyOrder.getPrice(), sellOrder.getPrice());
return;
}
this.writeTradingLog(buyOrder, sellOrder);

totalTradedPrice = tradedPrice * tradedSize;
// 주문 잔여수량, 잔여금액 감소
if (isMarketOrder(buyOrder))
buyOrder.decreaseRemainingDeposit(totalTradedPrice);
else
buyOrder.decreaseRemainingSize(tradedSize);
sellOrder.decreaseRemainingSize(tradedSize);

// 주문 완전체결 처리(잔여금액 or 잔여수량이 0)
this.removeCompletedBuyOrder(waitingOrders, buyOrder);
this.removeCompletedSellOrder(waitingOrders, sellOrder);

tradeService.updateOrder(buyOrder);
tradeService.updateOrder(sellOrder);

// 예수금 처리
// - 매수 잔여금액 반환
if (!isMarketOrder(buyOrder) && buyOrder.getPrice() > tradedPrice) { // 매도 호가보다 높은 가격에 매수를 시도한 경우, 차액 반환
double totalRefundAmount = (buyOrder.getPrice() - tradedPrice) * tradedSize;
this.increaseAccountCash(buyOrder, totalRefundAmount);
}

// - 매도 예수금 처리
this.increaseAccountCash(sellOrder, totalTradedPrice);

// 지갑 누적계산
this.updateWalletAfterTrade(buyOrder, ticker, tradedSize, totalTradedPrice);
this.updateWalletAfterTrade(sellOrder, ticker, tradedSize, totalTradedPrice);

// 체결내역 저장
Trade trade = this.insertNewTrade(ticker, buyOrder, sellOrder, tradedSize, tradedPrice);

TradeExecutedEvent tradeExecutedEvent = TradeExecutedEvent.of(trade, buyOrder.getId(), sellOrder.getId());
tradeExecutedEventPublisher.publish(tradeExecutedEvent);
}

public void increaseAccountCash(Order order, Double amount) {
Account account = accountService.findAccountByUserId(order.getUserId()).orElseThrow();
accountService.save(account.increaseCash(amount));
}

public void updateWalletAfterTrade(Order order, String ticker, double tradedSize, double totalTradedPrice) {
if (order instanceof BuyOrder) {
Wallet buyerWallet = walletService.findWalletByUserIdAndTicker(order.getUserId(), ticker);
double updatedBuySize = buyerWallet.getSize() + tradedSize;
double currentBuyPrice = buyerWallet.getBuyPrice() == null ? 0.0 : buyerWallet.getBuyPrice();
double updatedBuyPrice = ((currentBuyPrice * buyerWallet.getSize()) + totalTradedPrice) / updatedBuySize;
buyerWallet.setSize(updatedBuySize);
buyerWallet.setBuyPrice(updatedBuyPrice);
// TODO : ROI 계산
walletService.save(buyerWallet);
} else if (order instanceof SellOrder) {
// 매도 시에는 평단가 변동 없음
Wallet sellerWallet = walletService.findWalletByUserIdAndTicker(order.getUserId(), ticker);
walletService.save(sellerWallet);
} else {
throw new BusinessException("Unsupported order type: " + order.getClass().getName(), ErrorStatus.INTERNAL_SERVER_ERROR);
}
}

public Trade insertNewTrade(String ticker, BuyOrder buyOrder, SellOrder sellOrder, double tradeSize, Double tradePrice) {
Trade newTrade = Trade.of(ticker, LocalDateTime.now(), buyOrder.getUserId(), sellOrder.getUserId(), tradePrice, tradeSize);

return tradeService.save(newTrade);
}

private static TradeUnitPriceAndSize getTradeUnitPriceAndSize(BuyOrder buyOrder, SellOrder sellOrder) {
double tradedPrice;
double tradedSize;
if (isMarketOrder(buyOrder)) { // 시장가매수-지정가매도
tradedPrice = sellOrder.getPrice();
if (buyOrder.getRemainingDeposit() >= tradedPrice * sellOrder.getRemainingSize()) { // 매수 잔여예수금이 매도 잔여량보다 크거나 같은 경우 (매수 부분체결 or 완전체결, 매도 완전체결)
tradedSize = sellOrder.getRemainingSize();
} else {
tradedSize = buyOrder.getRemainingDeposit() / tradedPrice;
}
} else if (isMarketOrder(sellOrder)) { // 시장가매도-지정가매수
tradedPrice = buyOrder.getPrice();
tradedSize = Math.min(sellOrder.getRemainingSize(), buyOrder.getRemainingSize());
} else { // 지정가매수-지정가매도
tradedPrice = getTradedUnitPrice(buyOrder, sellOrder);
tradedSize = Math.min(buyOrder.getRemainingSize(), sellOrder.getRemainingSize());
}
return new TradeUnitPriceAndSize(tradedSize, tradedPrice);
}

private record TradeUnitPriceAndSize(double tradedSize, double tradedPrice) {
}

private static double getTradedUnitPrice(BuyOrder buyOrder, SellOrder sellOrder) {
// 주문 시간을 비교하여 먼저 들어온 주문의 가격으로 거래
if (buyOrder.getCreatedAt().isBefore(sellOrder.getCreatedAt())) {
return buyOrder.getPrice();
} else {
return sellOrder.getPrice();
}
}

private void writeTradingLog(BuyOrder buyOrder, SellOrder sellOrder) {
log.debug("[{}] 체결 확정! 종목: {}, ({}: {}가 {}로 {}만큼 매수주문), ({}: {}가 {}로 {}만큼 매도주문)",
Thread.currentThread().threadId(),
buyOrder.getTicker(),
buyOrder.getId(),
buyOrder.getUserId(),
isMarketOrder(buyOrder) ? "시장가" : "지정가(" + buyOrder.getPrice() + "원)",
buyOrder.getRemainingSize() == null ? buyOrder.getRemainingDeposit() : buyOrder.getRemainingSize(),
sellOrder.getId(),
sellOrder.getUserId(),
isMarketOrder(sellOrder) ? "시장가" : "지정가(" + sellOrder.getPrice() + "원)",
sellOrder.getRemainingSize());
}

private void removeCompletedBuyOrder(WaitingOrders waitingOrders, BuyOrder order) {
boolean isOrderCompleted = (isMarketOrder(order) && approxEquals(order.getRemainingDeposit(), 0.0)) ||
(isLimitOrder(order) && approxEquals(order.getRemainingSize(), 0.0));

if (isOrderCompleted) {
waitingOrders.removeOrder(order);
this.updateCompletedOrderStatus(order);
}
}

private void removeCompletedSellOrder(WaitingOrders waitingOrders, SellOrder order) {
boolean isOrderCompleted = approxEquals(order.getRemainingSize(), 0.0);

if (isOrderCompleted) {
waitingOrders.removeOrder(order);
this.updateCompletedOrderStatus(order);
}
}

public void updateCompletedOrderStatus(Order order) {
order.setState(OrderStatus.DONE);
}

private static boolean isMarketOrder(Order order) {
return order.getIsMarketOrder();
}

private static boolean isLimitOrder(Order order) {
return !order.getIsMarketOrder();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.cleanengine.coin.trade.application;

import com.cleanengine.coin.order.domain.Order;
import com.cleanengine.coin.order.domain.spi.WaitingOrders;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.Optional;

@Slf4j
@RequiredArgsConstructor
@Transactional
@Component
public class TradeFlowService {

private final TradeMatcher tradeMatcher;
private final TradeExecutor tradeExecutor;

public void execMatchAndTrade(String ticker) {
WaitingOrders waitingOrders = tradeMatcher.getWaitingOrders(ticker);
// TODO : peek() 해온 Order 객체들을 lock -> 체결 도중 취소 방지
Optional<TradePair<Order, Order>> tradePair = tradeMatcher.matchOrders(waitingOrders);

tradePair.ifPresent(orderOrderTradePair -> tradeExecutor.executeTrade(waitingOrders, orderOrderTradePair, ticker));
}

}
Loading