Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,51 +37,35 @@ public void publishRealTimeOhlc() {
try {
log.debug("△ 실시간 OHLC 데이터 스케줄러 실행");

// 구독된 티커가 없으면 조기 종료
if (subscriptionService.getAllRealTimeOhlcSubscribedTickers().isEmpty()) {
log.debug("실시간 OHLC 구독된 티커 없음, 전송 생략");
return;
}
final LocalDateTime now = LocalDateTime.now();

// 모든 구독된 티커에 대해 데이터 전송
for (String ticker : subscriptionService.getAllRealTimeOhlcSubscribedTickers()) {
try {
log.debug("티커 {} 실시간 OHLC 데이터 전송 중...", ticker);

// 티커별 최신 OHLC 데이터 조회 및 전송
RealTimeOhlcDto ohlcData = realTimeOhlcService.getRealTimeOhlc(ticker);
RealTimeOhlcDto ohlcData = realTimeOhlcService.getAndUpdateCumulative1mOhlc(ticker, now);

if (ohlcData == null) {
// 이전에 전송한 데이터가 있는지 확인
RealTimeOhlcDto lastSentData = lastSentOhlcDataMap.get(ticker);

if (lastSentData != null) {
// 이전 데이터가 있으면 타임스탬프만 업데이트하여 재사용
log.debug("티커 {}의 실시간 OHLC 데이터가 없습니다. 이전 데이터 재사용", ticker);
RealTimeOhlcDto updatedData = new RealTimeOhlcDto(lastSentData.getTicker(), LocalDateTime.now(), // 현재 시간으로 업데이트
RealTimeOhlcDto updatedData = new RealTimeOhlcDto(lastSentData.getTicker(), now,
lastSentData.getOpen(), lastSentData.getHigh(), lastSentData.getLow(), lastSentData.getClose(), lastSentData.getVolume());

messagingTemplate.convertAndSend("/topic/realTimeOhlc/" + ticker, updatedData);
lastSentOhlcDataMap.put(ticker, updatedData); // 캐시 업데이트
lastSentOhlcDataMap.put(ticker, updatedData);
} else {
// 이전 데이터도 없는 경우 빈 데이터 전송 (첫 구독 시)
log.debug("티커 {}의 이전 OHLC 데이터도 없습니다. 빈 데이터 전송", ticker);
RealTimeOhlcDto emptyData = new RealTimeOhlcDto();
emptyData.setTicker(ticker);
emptyData.setTimestamp(LocalDateTime.now());
emptyData.setOpen(0.0);
emptyData.setHigh(0.0);
emptyData.setLow(0.0);
emptyData.setClose(0.0);
emptyData.setVolume(0.0);

RealTimeOhlcDto emptyData = new RealTimeOhlcDto(ticker, now, 0.0, 0.0, 0.0, 0.0, 0.0);
messagingTemplate.convertAndSend("/topic/realTimeOhlc/" + ticker, emptyData);
lastSentOhlcDataMap.put(ticker, emptyData); // 캐시 업데이트
lastSentOhlcDataMap.put(ticker, emptyData);
}
} else {
// 조회된 실시간 OHLC 데이터 전송
messagingTemplate.convertAndSend("/topic/realTimeOhlc/" + ticker, ohlcData);
lastSentOhlcDataMap.put(ticker, ohlcData); // 캐시 업데이트
lastSentOhlcDataMap.put(ticker, ohlcData);
log.debug("실시간 OHLC 데이터 전송: {}", ohlcData);
}
} catch (Exception e) {
Expand All @@ -91,8 +75,5 @@ public void publishRealTimeOhlc() {
} catch (Exception e) {
log.error("△ 실시간 OHLC 데이터 발행 중 오류: {}", e.getMessage(), e);
}

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.cleanengine.coin.chart.dto.RealTimeOhlcDto;
import com.cleanengine.coin.chart.service.ChartSubscriptionService;
import com.cleanengine.coin.chart.service.RealTimeOhlcService;
import com.cleanengine.coin.chart.service.RealTimeOhlcService; // RealTimeOhlcService 의존성은 이제 불필요
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
Expand All @@ -19,9 +19,8 @@
public class WebSocketMessageController {

private final ChartSubscriptionService subscriptionService;
private final RealTimeOhlcService realTimeOhlcService;
private final SimpMessagingTemplate messagingTemplate;
private final ChartDataController chartDataController;
private final ChartDataController chartDataController; // 이미 계산된 데이터를 가진 컨트롤러를 활용

/**
* 실시간 OHLC 데이터 구독 처리
Expand All @@ -34,30 +33,16 @@ public void subscribeRealTimeOhlc(RealTimeTradeMappingDto request) {
// 구독 목록에 추가
subscriptionService.subscribeRealTimeOhlc(ticker);

// 구독 즉시 최근 실시간 OHLC 데이터 전송
RealTimeOhlcDto latestOhlcData = realTimeOhlcService.getRealTimeOhlc(ticker);

RealTimeOhlcDto lastSentData = chartDataController.getLastSentOhlcDataMap().get(ticker);

if (latestOhlcData == null) {
if (lastSentData != null) {
// 이전에 전송한 데이터가 있으면 재사용
log.debug("티커 {}의 실시간 OHLC 데이터가 없습니다. 이전 데이터 재사용", ticker);
messagingTemplate.convertAndSend("/topic/realTimeOhlc/" + ticker, lastSentData);
} else {
// 이전 데이터도 없는 경우 빈 데이터 전송
log.debug("티커 {}의 실시간 OHLC 데이터가 없습니다. 빈 데이터 전송", ticker);
RealTimeOhlcDto emptyData = createEmptyRealTimeOhlcDto(ticker);
messagingTemplate.convertAndSend("/topic/realTimeOhlc/" + ticker, emptyData);
// 빈 데이터도 캐시에 저장
chartDataController.getLastSentOhlcDataMap().put(ticker, emptyData);
}
if (lastSentData == null) {
log.debug("티커 {}의 캐시된 OHLC 데이터가 없습니다. 빈 데이터 전송", ticker);
RealTimeOhlcDto emptyData = createEmptyRealTimeOhlcDto(ticker);
messagingTemplate.convertAndSend("/topic/realTimeOhlc/" + ticker, emptyData);
} else {
log.debug("티커 {}의 실시간 OHLC 데이터 전송: {}", ticker, latestOhlcData);
messagingTemplate.convertAndSend("/topic/realTimeOhlc/" + ticker, latestOhlcData);
// 데이터 캐시에 저장
chartDataController.getLastSentOhlcDataMap().put(ticker, latestOhlcData);

// 캐시된 데이터가 있으면 즉시 전송
log.debug("티커 {}의 캐시된 OHLC 데이터 즉시 전송: {}", ticker, lastSentData);
messagingTemplate.convertAndSend("/topic/realTimeOhlc/" + ticker, lastSentData);
}
}

Expand All @@ -80,6 +65,5 @@ private RealTimeOhlcDto createEmptyRealTimeOhlcDto(String ticker) {
@Getter
public static class RealTimeTradeMappingDto {
private String ticker;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -20,101 +21,86 @@ public class RealTimeOhlcService {

private final TradeRepository tradeRepository;

// 티커별 마지막 처리 시간
private final Map<String, LocalDateTime> lastProcessedTimeMap = new ConcurrentHashMap<>();

// 티커별 마지막 OHLC 데이터 캐싱
private final Map<String, RealTimeOhlcDto> lastOhlcDataMap = new ConcurrentHashMap<>();
private final Map<String, RealTimeOhlcDto> currentMinuteOhlcCache = new ConcurrentHashMap<>();

/**
* 특정 티커의 최신 1초 OHLC 데이터 생성
*/
public RealTimeOhlcDto getRealTimeOhlc(String ticker) {
try {
LocalDateTime now = LocalDateTime.now();

// 시간 범위 계산
TimeRange timeRange = calculateTimeRange(ticker, now);
public RealTimeOhlcDto getAndUpdateCumulative1mOhlc(String ticker, LocalDateTime now ) {
try {
LocalDateTime currentMinuteStart = now.truncatedTo(ChronoUnit.MINUTES);

// 거래 데이터 조회 및 전처리
List<Trade> recentTrades = getProcessedTradeData(ticker, timeRange);
RealTimeOhlcDto cachedOhlc = currentMinuteOhlcCache.get(ticker);

// 거래 데이터가 없으면 캐시된 데이터 반환
if (recentTrades.isEmpty()) {
return getCachedData(ticker);
if (cachedOhlc == null || cachedOhlc.getTimestamp().isBefore(currentMinuteStart)) {
return handleNewMinute(ticker, now, currentMinuteStart);
}
else {
return handleExistingMinute(ticker, now, cachedOhlc);
}
} catch (Exception e) {
log.error("티커 {}의 누적 OHLC 데이터 생성 중 오류 발생: {}", ticker, e.getMessage(), e);
// 오류 발생 시 캐시된 마지막 데이터라도 반환
return currentMinuteOhlcCache.get(ticker);
}
}

calculateOhlcv ohlcv = getCalculateOhlcv(recentTrades);
private RealTimeOhlcDto handleNewMinute(String ticker, LocalDateTime now, LocalDateTime minuteStart) {
log.debug("티커 {}: 새로운 1분봉 시작 ({}).", ticker, minuteStart);
List<Trade> trades = tradeRepository.findByTickerAndTradeTimeBetweenOrderByTradeTimeAsc(ticker, minuteStart, now);

RealTimeOhlcDto ohlcData = createOhlcDto(ticker, now, ohlcv);
if (trades.isEmpty()) {
return null;
}

// 캐시 업데이트
updateCache(ticker, now, ohlcData);
// 새 거래내역으로 OHLCV 계산
CalculateOhlcv ohlcv = getCalculateOhlcv(trades);
RealTimeOhlcDto newOhlc = createOhlcDto(ticker, now, ohlcv);

return ohlcData;
} catch (Exception e) {
log.error("실시간 OHLC 데이터 생성 중 오류: {}", e.getMessage(), e);
return getCachedData(ticker);
}
// 캐시를 새로운 1분봉 데이터로 교체
currentMinuteOhlcCache.put(ticker, newOhlc);
return newOhlc;
}

// 시간 범위 계산
TimeRange calculateTimeRange(String ticker, LocalDateTime now) {
LocalDateTime lastProcessedTime = lastProcessedTimeMap.getOrDefault(
ticker, now.minusSeconds(1));
return new TimeRange(lastProcessedTime, now);
}

// 거래 데이터 조회 및 전처리
List<Trade> getProcessedTradeData(String ticker, TimeRange timeRange) {
return tradeRepository.findByTickerAndTradeTimeBetweenOrderByTradeTimeAsc(
ticker,
timeRange.start(),
timeRange.end()
);
}
private RealTimeOhlcDto handleExistingMinute(String ticker, LocalDateTime now, RealTimeOhlcDto cachedOhlc) {
LocalDateTime lastProcessedTime = cachedOhlc.getTimestamp();
List<Trade> newTrades = tradeRepository.findByTickerAndTradeTimeBetweenOrderByTradeTimeAsc(ticker, lastProcessedTime, now);

// 캐시 업데이트
void updateCache(String ticker, LocalDateTime now, RealTimeOhlcDto ohlcData) {
lastProcessedTimeMap.put(ticker, now);
lastOhlcDataMap.put(ticker, ohlcData);
}
// 새로운 거래가 없다면, 타임스탬프만 최신으로 업데이트하여 "살아있음"을 알림
if (newTrades.isEmpty()) {
cachedOhlc.setTimestamp(now);
return cachedOhlc;
}

// 캐시된 데이터 조회
RealTimeOhlcDto getCachedData(String ticker) {
return lastOhlcDataMap.getOrDefault(ticker, null);
}
log.trace("티커 {}: 기존 1분봉 업데이트. 신규 거래 {}건", ticker, newTrades.size());

// DTO 생성
RealTimeOhlcDto createOhlcDto(String ticker, LocalDateTime timestamp, calculateOhlcv ohlcv) {
return new RealTimeOhlcDto(
ticker,
timestamp,
ohlcv.open(),
ohlcv.high(),
ohlcv.low(),
ohlcv.close(),
ohlcv.volume()
);
// Open(시가)는 분이 끝날때까지 고정
cachedOhlc.setHigh(Math.max(cachedOhlc.getHigh(), newTrades.stream().mapToDouble(Trade::getPrice).max().orElse(cachedOhlc.getHigh())));
cachedOhlc.setLow(Math.min(cachedOhlc.getLow(), newTrades.stream().mapToDouble(Trade::getPrice).min().orElse(cachedOhlc.getLow())));
cachedOhlc.setClose(newTrades.getLast().getPrice()); // 종가는 항상 마지막 거래 가격
cachedOhlc.setVolume(cachedOhlc.getVolume() + newTrades.stream().mapToDouble(Trade::getSize).sum());
cachedOhlc.setTimestamp(now); // 마지막 처리 시간 갱신

return cachedOhlc;
}

// OHLCV 계산 메서드


@NotNull
static calculateOhlcv getCalculateOhlcv(List<Trade> trades) {
// trades는 시간 오름차순 정렬되어 있음
Double open = trades.getFirst().getPrice(); // 첫 번째(가장 오래된) = Open ✅
Double close = trades.getLast().getPrice(); // 마지막(가장 최근) = Close ✅
private CalculateOhlcv getCalculateOhlcv(List<Trade> trades) {
Double open = trades.getFirst().getPrice();
Double close = trades.getLast().getPrice();
Double high = trades.stream().mapToDouble(Trade::getPrice).max().orElse(0.0);
Double low = trades.stream().mapToDouble(Trade::getPrice).min().orElse(0.0);
Double volume = trades.stream().mapToDouble(Trade::getSize).sum();

return new calculateOhlcv(open, high, low, close, volume);
return new CalculateOhlcv(open, high, low, close, volume);
}

private RealTimeOhlcDto createOhlcDto(String ticker, LocalDateTime timestamp, CalculateOhlcv ohlcv) {
return new RealTimeOhlcDto(
ticker, timestamp, ohlcv.open(), ohlcv.high(), ohlcv.low(), ohlcv.close(), ohlcv.volume()
);
}



record TimeRange(LocalDateTime start, LocalDateTime end) {}

record calculateOhlcv(Double open, Double high, Double low, Double close, Double volume) {}
record CalculateOhlcv(Double open, Double high, Double low, Double close, Double volume) {}
}
Loading