99import org .springframework .stereotype .Service ;
1010
1111import java .time .LocalDateTime ;
12+ import java .time .temporal .ChronoUnit ;
1213import java .util .List ;
1314import java .util .Map ;
1415import java .util .concurrent .ConcurrentHashMap ;
@@ -20,101 +21,86 @@ public class RealTimeOhlcService {
2021
2122 private final TradeRepository tradeRepository ;
2223
23- // 티커별 마지막 처리 시간
24- private final Map <String , LocalDateTime > lastProcessedTimeMap = new ConcurrentHashMap <>();
2524
26- // 티커별 마지막 OHLC 데이터 캐싱
27- private final Map <String , RealTimeOhlcDto > lastOhlcDataMap = new ConcurrentHashMap <>();
25+ private final Map <String , RealTimeOhlcDto > currentMinuteOhlcCache = new ConcurrentHashMap <>();
2826
29- /**
30- * 특정 티커의 최신 1초 OHLC 데이터 생성
31- */
32- public RealTimeOhlcDto getRealTimeOhlc (String ticker ) {
33- try {
34- LocalDateTime now = LocalDateTime .now ();
3527
36- // 시간 범위 계산
37- TimeRange timeRange = calculateTimeRange (ticker , now );
28+ public RealTimeOhlcDto getAndUpdateCumulative1mOhlc (String ticker , LocalDateTime now ) {
29+ try {
30+ LocalDateTime currentMinuteStart = now .truncatedTo (ChronoUnit .MINUTES );
3831
39- // 거래 데이터 조회 및 전처리
40- List <Trade > recentTrades = getProcessedTradeData (ticker , timeRange );
32+ RealTimeOhlcDto cachedOhlc = currentMinuteOhlcCache .get (ticker );
4133
42- // 거래 데이터가 없으면 캐시된 데이터 반환
43- if (recentTrades .isEmpty ()) {
44- return getCachedData (ticker );
34+ if (cachedOhlc == null || cachedOhlc .getTimestamp ().isBefore (currentMinuteStart )) {
35+ return handleNewMinute (ticker , now , currentMinuteStart );
4536 }
37+ else {
38+ return handleExistingMinute (ticker , now , cachedOhlc );
39+ }
40+ } catch (Exception e ) {
41+ log .error ("티커 {}의 누적 OHLC 데이터 생성 중 오류 발생: {}" , ticker , e .getMessage (), e );
42+ // 오류 발생 시 캐시된 마지막 데이터라도 반환
43+ return currentMinuteOhlcCache .get (ticker );
44+ }
45+ }
4646
47- calculateOhlcv ohlcv = getCalculateOhlcv (recentTrades );
47+ private RealTimeOhlcDto handleNewMinute (String ticker , LocalDateTime now , LocalDateTime minuteStart ) {
48+ log .debug ("티커 {}: 새로운 1분봉 시작 ({})." , ticker , minuteStart );
49+ List <Trade > trades = tradeRepository .findByTickerAndTradeTimeBetweenOrderByTradeTimeAsc (ticker , minuteStart , now );
4850
49- RealTimeOhlcDto ohlcData = createOhlcDto (ticker , now , ohlcv );
51+ if (trades .isEmpty ()) {
52+ return null ;
53+ }
5054
51- // 캐시 업데이트
52- updateCache (ticker , now , ohlcData );
55+ // 새 거래내역으로 OHLCV 계산
56+ CalculateOhlcv ohlcv = getCalculateOhlcv (trades );
57+ RealTimeOhlcDto newOhlc = createOhlcDto (ticker , now , ohlcv );
5358
54- return ohlcData ;
55- } catch (Exception e ) {
56- log .error ("실시간 OHLC 데이터 생성 중 오류: {}" , e .getMessage (), e );
57- return getCachedData (ticker );
58- }
59+ // 캐시를 새로운 1분봉 데이터로 교체
60+ currentMinuteOhlcCache .put (ticker , newOhlc );
61+ return newOhlc ;
5962 }
6063
61- // 시간 범위 계산
62- TimeRange calculateTimeRange (String ticker , LocalDateTime now ) {
63- LocalDateTime lastProcessedTime = lastProcessedTimeMap .getOrDefault (
64- ticker , now .minusSeconds (1 ));
65- return new TimeRange (lastProcessedTime , now );
66- }
6764
68- // 거래 데이터 조회 및 전처리
69- List <Trade > getProcessedTradeData (String ticker , TimeRange timeRange ) {
70- return tradeRepository .findByTickerAndTradeTimeBetweenOrderByTradeTimeAsc (
71- ticker ,
72- timeRange .start (),
73- timeRange .end ()
74- );
75- }
65+ private RealTimeOhlcDto handleExistingMinute (String ticker , LocalDateTime now , RealTimeOhlcDto cachedOhlc ) {
66+ LocalDateTime lastProcessedTime = cachedOhlc .getTimestamp ();
67+ List <Trade > newTrades = tradeRepository .findByTickerAndTradeTimeBetweenOrderByTradeTimeAsc (ticker , lastProcessedTime , now );
7668
77- // 캐시 업데이트
78- void updateCache ( String ticker , LocalDateTime now , RealTimeOhlcDto ohlcData ) {
79- lastProcessedTimeMap . put ( ticker , now );
80- lastOhlcDataMap . put ( ticker , ohlcData ) ;
81- }
69+ // 새로운 거래가 없다면, 타임스탬프만 최신으로 업데이트하여 "살아있음"을 알림
70+ if ( newTrades . isEmpty () ) {
71+ cachedOhlc . setTimestamp ( now );
72+ return cachedOhlc ;
73+ }
8274
83- // 캐시된 데이터 조회
84- RealTimeOhlcDto getCachedData (String ticker ) {
85- return lastOhlcDataMap .getOrDefault (ticker , null );
86- }
75+ log .trace ("티커 {}: 기존 1분봉 업데이트. 신규 거래 {}건" , ticker , newTrades .size ());
8776
88- // DTO 생성
89- RealTimeOhlcDto createOhlcDto (String ticker , LocalDateTime timestamp , calculateOhlcv ohlcv ) {
90- return new RealTimeOhlcDto (
91- ticker ,
92- timestamp ,
93- ohlcv .open (),
94- ohlcv .high (),
95- ohlcv .low (),
96- ohlcv .close (),
97- ohlcv .volume ()
98- );
77+ // Open(시가)는 분이 끝날때까지 고정
78+ cachedOhlc .setHigh (Math .max (cachedOhlc .getHigh (), newTrades .stream ().mapToDouble (Trade ::getPrice ).max ().orElse (cachedOhlc .getHigh ())));
79+ cachedOhlc .setLow (Math .min (cachedOhlc .getLow (), newTrades .stream ().mapToDouble (Trade ::getPrice ).min ().orElse (cachedOhlc .getLow ())));
80+ cachedOhlc .setClose (newTrades .getLast ().getPrice ()); // 종가는 항상 마지막 거래 가격
81+ cachedOhlc .setVolume (cachedOhlc .getVolume () + newTrades .stream ().mapToDouble (Trade ::getSize ).sum ());
82+ cachedOhlc .setTimestamp (now ); // 마지막 처리 시간 갱신
83+
84+ return cachedOhlc ;
9985 }
10086
101- // OHLCV 계산 메서드
87+
88+
10289 @ NotNull
103- static calculateOhlcv getCalculateOhlcv (List <Trade > trades ) {
104- // trades는 시간 오름차순 정렬되어 있음
105- Double open = trades .getFirst ().getPrice (); // 첫 번째(가장 오래된) = Open ✅
106- Double close = trades .getLast ().getPrice (); // 마지막(가장 최근) = Close ✅
90+ private CalculateOhlcv getCalculateOhlcv (List <Trade > trades ) {
91+ Double open = trades .getFirst ().getPrice ();
92+ Double close = trades .getLast ().getPrice ();
10793 Double high = trades .stream ().mapToDouble (Trade ::getPrice ).max ().orElse (0.0 );
10894 Double low = trades .stream ().mapToDouble (Trade ::getPrice ).min ().orElse (0.0 );
10995 Double volume = trades .stream ().mapToDouble (Trade ::getSize ).sum ();
110-
111- return new calculateOhlcv (open , high , low , close , volume );
96+ return new CalculateOhlcv (open , high , low , close , volume );
11297 }
11398
99+ private RealTimeOhlcDto createOhlcDto (String ticker , LocalDateTime timestamp , CalculateOhlcv ohlcv ) {
100+ return new RealTimeOhlcDto (
101+ ticker , timestamp , ohlcv .open (), ohlcv .high (), ohlcv .low (), ohlcv .close (), ohlcv .volume ()
102+ );
103+ }
114104
115-
116-
117- record TimeRange (LocalDateTime start , LocalDateTime end ) {}
118-
119- record calculateOhlcv (Double open , Double high , Double low , Double close , Double volume ) {}
105+ record CalculateOhlcv (Double open , Double high , Double low , Double close , Double volume ) {}
120106}
0 commit comments