diff --git a/src/main/java/com/factoreal/backend/dto/SensorDto.java b/src/main/java/com/factoreal/backend/dto/SensorDto.java index ddb0f949..5e13afe9 100644 --- a/src/main/java/com/factoreal/backend/dto/SensorDto.java +++ b/src/main/java/com/factoreal/backend/dto/SensorDto.java @@ -15,6 +15,7 @@ public class SensorDto { // BE -> FE 용 DTO private String equipId; private Double sensorThres; // 임계치 private Double allowVal; // 허용치 + private Integer isZone; public static SensorDto fromEntity(Sensor sensor) { if (sensor == null) return null; @@ -26,6 +27,7 @@ public static SensorDto fromEntity(Sensor sensor) { .equipId(sensor.getEquip().getEquipId()) .sensorThres(sensor.getSensorThres()) .allowVal(sensor.getAllowVal()) + .isZone(sensor.getIsZone()) .build(); } } \ No newline at end of file diff --git a/src/main/java/com/factoreal/backend/dto/SensorKafkaDto.java b/src/main/java/com/factoreal/backend/dto/SensorKafkaDto.java index 466ae0d1..f1ae5000 100644 --- a/src/main/java/com/factoreal/backend/dto/SensorKafkaDto.java +++ b/src/main/java/com/factoreal/backend/dto/SensorKafkaDto.java @@ -9,12 +9,16 @@ @Setter @NoArgsConstructor @AllArgsConstructor -// Kafka mapper용 DTO +/** + * Kafka Sensor 데이터 mapper용 DTO + */ public class SensorKafkaDto { - private String zoneId; - private String equipId; - private String sensorId; - private String sensorType; - private Double val; // 측정값 단위 - private String time; // 센서 생성시간 + private String zoneId; // 공간 ID + private String equipId; // 설비 ID + private String sensorId; // 센서 ID + private String sensorType; // 센서 타입 + private Double val; // 센서 측정값 + private String Category; // 센서 카테고리 {ENVIRONMENT, EQUIPMENT} + private String time; // 센서 탐지 시간 + private int dangerLevel; // 위험도 } diff --git a/src/main/java/com/factoreal/backend/entity/Sensor.java b/src/main/java/com/factoreal/backend/entity/Sensor.java index f24b69b2..2c9d7835 100644 --- a/src/main/java/com/factoreal/backend/entity/Sensor.java +++ b/src/main/java/com/factoreal/backend/entity/Sensor.java @@ -1,6 +1,6 @@ package com.factoreal.backend.entity; -import com.factoreal.backend.strategy.enums.SensorType; +import com.factoreal.backend.kafka.strategy.enums.SensorType; import jakarta.persistence.*; import lombok.*; import java.time.LocalDateTime; diff --git a/src/main/java/com/factoreal/backend/consumer/kafka/KafkaConsumer.java b/src/main/java/com/factoreal/backend/kafka/KafkaConsumerD.java similarity index 95% rename from src/main/java/com/factoreal/backend/consumer/kafka/KafkaConsumer.java rename to src/main/java/com/factoreal/backend/kafka/KafkaConsumerD.java index fb0dc44f..e8defda5 100644 --- a/src/main/java/com/factoreal/backend/consumer/kafka/KafkaConsumer.java +++ b/src/main/java/com/factoreal/backend/kafka/KafkaConsumerD.java @@ -1,19 +1,17 @@ -package com.factoreal.backend.consumer.kafka; +package com.factoreal.backend.kafka; import com.factoreal.backend.dto.SensorKafkaDto; import com.factoreal.backend.dto.abnormalLog.LogType; import com.factoreal.backend.entity.AbnormalLog; import com.factoreal.backend.dto.SystemLogDto; -import com.factoreal.backend.entity.Zone; import com.factoreal.backend.sender.WebSocketSender; import com.factoreal.backend.service.ZoneService; -import com.factoreal.backend.entity.AbnormalLog; import com.factoreal.backend.service.AbnormalLogService; -import com.factoreal.backend.strategy.NotificationStrategy; -import com.factoreal.backend.strategy.NotificationStrategyFactory; -import com.factoreal.backend.strategy.enums.AlarmEventDto; -import com.factoreal.backend.strategy.enums.RiskLevel; -import com.factoreal.backend.strategy.enums.SensorType; +import com.factoreal.backend.kafka.strategy.alarmList.NotificationStrategy; +import com.factoreal.backend.kafka.strategy.NotificationStrategyFactory; +import com.factoreal.backend.kafka.strategy.enums.AlarmEventDto; +import com.factoreal.backend.kafka.strategy.enums.RiskLevel; +import com.factoreal.backend.kafka.strategy.enums.SensorType; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; @@ -22,7 +20,6 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.annotation.KafkaListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import com.factoreal.backend.service.SensorService; @@ -31,19 +28,15 @@ import java.time.ZoneId; import java.time.Instant; -import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Stream; -import java.util.Objects; @Service @Slf4j @RequiredArgsConstructor -public class KafkaConsumer { +public class KafkaConsumerD { private final ObjectMapper objectMapper; private final WebSocketSender webSocketSender; @@ -69,8 +62,8 @@ public class KafkaConsumer { // @KafkaListener(topics = {"EQUIPMENT", "ENVIRONMENT"}, groupId = // "monitory-consumer-group-1") - @KafkaListener(topics = { "EQUIPMENT", - "ENVIRONMENT" }, groupId = "${spring.kafka.consumer.group-id:danger-alert-group}") +// @KafkaListener(topics = { "EQUIPMENT", +// "ENVIRONMENT" }, groupId = "${spring.kafka.consumer.group-id:danger-alert-group}") public void consume(String message) { log.info("💡수신한 Kafka 메시지 : " + message); @@ -87,6 +80,7 @@ public void consume(String message) { // 비동기 ES 저장 // ################################# saveToElasticsearch(dto); + log.info("▶︎ 위험도 감지 start"); int dangerLevel = getDangerLevel(dto.getSensorType(), dto.getVal()); log.info("⚠️ 위험도 {} 센서 타입 : {} 감지됨. Zone: {}", dangerLevel, dto.getSensorType(), dto.getZoneId()); diff --git a/src/main/java/com/factoreal/backend/kafka/consumer/KafkaConsumer.java b/src/main/java/com/factoreal/backend/kafka/consumer/KafkaConsumer.java new file mode 100644 index 00000000..7a3cfff5 --- /dev/null +++ b/src/main/java/com/factoreal/backend/kafka/consumer/KafkaConsumer.java @@ -0,0 +1,52 @@ +package com.factoreal.backend.kafka.consumer; + +import com.factoreal.backend.dto.SensorKafkaDto; +import com.factoreal.backend.kafka.processor.SensorEventProcessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + + +/** + * KafkaConsumer 클래스는 Kafka 토픽으로부터 메시지를 수신하고, + * 해당 메시지를 SensorEventProcessor.java 전달 + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class KafkaConsumer { + + private final ObjectMapper objectMapper; + private final SensorEventProcessor sensorEventProcessor; + + // 설비 센서 관련 Kafka 메시지 처리 + // Todo : 설비 머신러닝 끝나고 수정 예정 +// @KafkaListener(topics = "EQUIPMENT", groupId = "equipment-consumer-group") + public void consumeEquipment(String message) { + log.info("📩 [EQUIPMENT] Kafka 메시지 수신: {}", message); + handleMessage(message, "EQUIPMENT"); + } + + // 공간 센서 관련 Kafka 메시지 처리 + @KafkaListener(topics = "ENVIRONMENT", groupId = "environment-consumer-group") + public void consumeEnvironment(String message) { + log.info("📩 [ENVIRONMENT] Kafka 메시지 수신: {}", message); + handleMessage(message, "ENVIRONMENT"); + } + + // 공통 메시지 파싱 및 처리 전달 + private void handleMessage(String message, String topic) { + try { + SensorKafkaDto dto = objectMapper.readValue(message, SensorKafkaDto.class); + log.info("✅ Kafka 메시지 파싱 완료: sensorId={}, zoneId={}, val={}", + dto.getSensorId(), dto.getZoneId(), dto.getVal()); + sensorEventProcessor.process(dto, topic); + log.info("✅ Kafka 메시지 처리 위임 완료: topic={}", topic); + } catch (Exception e) { + log.error("❌ Kafka 메시지 파싱 또는 처리 실패: {}", message, e); + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/factoreal/backend/kafka/processor/SensorEventProcessor.java b/src/main/java/com/factoreal/backend/kafka/processor/SensorEventProcessor.java new file mode 100644 index 00000000..32c2764a --- /dev/null +++ b/src/main/java/com/factoreal/backend/kafka/processor/SensorEventProcessor.java @@ -0,0 +1,113 @@ +package com.factoreal.backend.kafka.processor; + +import com.factoreal.backend.dto.SensorKafkaDto; +import com.factoreal.backend.dto.abnormalLog.LogType; +import com.factoreal.backend.entity.AbnormalLog; +import com.factoreal.backend.sender.WebSocketSender; +import com.factoreal.backend.service.AbnormalLogService; +import com.factoreal.backend.service.AlarmEventService; +import com.factoreal.backend.service.AutoControlService; +import com.factoreal.backend.kafka.strategy.enums.RiskLevel; +import com.factoreal.backend.kafka.strategy.enums.SensorType; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** + * SensorEventProcessor 클래스는 Kafka로부터 전달받은 센서 데이터를 분석 및 처리하는 클래스입니다. + */ +@Service +@RequiredArgsConstructor +@Slf4j +public class SensorEventProcessor { + + private final AutoControlService autoControlService; + private final AbnormalLogService abnormalLogService; + private final AlarmEventService alarmEventService; + private final WebSocketSender webSocketSender; + + /** + * 센서 Kafka 메시지 처리 + * @param dto 센서 데이터 + * @param topic Kafka 토픽명 (EQUIPMENT, ENVIRONMENT) + */ + public void process(SensorKafkaDto dto, String topic) { + try { + + // 유효성 검사: zoneId와 sensorId는 필수 + if (dto.getZoneId() == null || dto.getZoneId().isBlank()) { + log.warn("⚠️ 유효하지 않은 zoneId: null 또는 빈 문자열"); + return; + } + if (dto.getSensorId() == null || dto.getSensorId().isBlank()) { + log.warn("⚠️ 유효하지 않은 sensorId: null 또는 빈 문자열"); + return; + } + + // ENVIRONMENT 토픽인 경우에만 아래 처리 로직 수행 + if ("ENVIRONMENT".equalsIgnoreCase(topic)) { + if (dto.getEquipId() == null || !dto.getEquipId().equals(dto.getZoneId())) { + log.warn("⚠️ ENVIRONMENT 토픽이지만 equipId와 zoneId가 일치하지 않음: equipId={}, zoneId={}", + dto.getEquipId(), dto.getZoneId()); + return; + } + + // 위험도 계산 + int dangerLevel = dto.getDangerLevel(); + SensorType sensorType = SensorType.getSensorType(dto.getSensorType()); + RiskLevel riskLevel = RiskLevel.fromPriority(dangerLevel); + LogType logType = topicToLogType(topic); + + // 자동 제어 메시지 판단 (Todo - 진행중) + autoControlService.evaluate(dto, dangerLevel); + + // 이상 로그 저장 + AbnormalLog abnLog = abnormalLogService.saveAbnormalLogFromKafkaDto( + dto, sensorType, riskLevel, logType); + + + // 읽지 않은 알림 수 조회 + Long count = abnormalLogService.readRequired(); + + // WebSocket 알림 전송 + // 1. 히트맵 전송 + webSocketSender.sendDangerLevel(dto.getZoneId(), dto.getSensorType(), dangerLevel); + // 2. 위험 알림 전송 -> 위험도별 Websocket + wearable + Slack(SMS 대체) + // Todo : (As-is) 전략 기반 startAlarm() 메서드 담당자 확인 필요 +// webSocketSender.sendDangerAlarm(abnLog.toAlarmEventDto()); + alarmEventService.startAlarm(dto,abnLog,dangerLevel); + // 3. 읽지 않은 수 전송 + webSocketSender.sendUnreadCount(count); + + + + log.info("✅ 센서 이벤트 처리 완료: sensorId={}, zoneId={}, level={} ({} topic)", + dto.getSensorId(), dto.getZoneId(), dangerLevel, topic); + } + } catch (Exception e) { + log.error("❌ 센서 이벤트 처리 실패: sensorId={}, zoneId={}", dto.getSensorId(), dto.getZoneId(), e); + } + } + + // 공간에 위험도 분기 로직 + // Todo : Flink에서 적용으로 변경되어 삭제 예정 + private int getDangerLevel(String sensorType, double val) { + switch (sensorType.toLowerCase()) { + case "temp": return val > 40 || val < -35 ? 2 : (val > 30 || val < 25 ? 1 : 0); + case "humid": return val >= 80 ? 2 : (val >= 60 ? 1 : 0); + case "vibration": return val > 7.1 ? 2 : (val > 2.8 ? 1 : 0); + case "current": return val >= 30 ? 2 : (val >= 7 ? 1 : 0); + case "dust": return val >= 150 ? 2 : (val >= 75 ? 1 : 0); + default: return 0; + } + } + + // topic enum 변경하기 + private LogType topicToLogType(String topic) { + return switch (topic.toUpperCase()) { + case "EQUIPMENT" -> LogType.Equip; + case "ENVIRONMENT" -> LogType.Sensor; + default -> throw new IllegalArgumentException("지원하지 않는 Kafka 토픽: " + topic); + }; + } +} diff --git a/src/main/java/com/factoreal/backend/strategy/NotificationStrategyFactory.java b/src/main/java/com/factoreal/backend/kafka/strategy/NotificationStrategyFactory.java similarity index 90% rename from src/main/java/com/factoreal/backend/strategy/NotificationStrategyFactory.java rename to src/main/java/com/factoreal/backend/kafka/strategy/NotificationStrategyFactory.java index c379948a..34b0a58f 100644 --- a/src/main/java/com/factoreal/backend/strategy/NotificationStrategyFactory.java +++ b/src/main/java/com/factoreal/backend/kafka/strategy/NotificationStrategyFactory.java @@ -1,6 +1,7 @@ -package com.factoreal.backend.strategy; +package com.factoreal.backend.kafka.strategy; -import com.factoreal.backend.strategy.enums.RiskLevel; +import com.factoreal.backend.kafka.strategy.alarmList.NotificationStrategy; +import com.factoreal.backend.kafka.strategy.enums.RiskLevel; import org.springframework.stereotype.Component; import java.util.List; diff --git a/src/main/java/com/factoreal/backend/strategy/AppPushNotificationStrategy.java b/src/main/java/com/factoreal/backend/kafka/strategy/alarmList/AppPushNotificationStrategy.java similarity index 66% rename from src/main/java/com/factoreal/backend/strategy/AppPushNotificationStrategy.java rename to src/main/java/com/factoreal/backend/kafka/strategy/alarmList/AppPushNotificationStrategy.java index 248a05f1..b0279f22 100644 --- a/src/main/java/com/factoreal/backend/strategy/AppPushNotificationStrategy.java +++ b/src/main/java/com/factoreal/backend/kafka/strategy/alarmList/AppPushNotificationStrategy.java @@ -1,7 +1,7 @@ -package com.factoreal.backend.strategy; +package com.factoreal.backend.kafka.strategy.alarmList; -import com.factoreal.backend.strategy.enums.AlarmEventDto; -import com.factoreal.backend.strategy.enums.RiskLevel; +import com.factoreal.backend.kafka.strategy.enums.AlarmEventDto; +import com.factoreal.backend.kafka.strategy.enums.RiskLevel; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -9,7 +9,7 @@ @Component("APP") public class AppPushNotificationStrategy implements NotificationStrategy { - + // TODO FCM 전송 로직 @Override public void send(AlarmEventDto alarmEventDto) { log.info("📲 App Push Notification Strategy."); diff --git a/src/main/java/com/factoreal/backend/kafka/strategy/alarmList/NotificationStrategy.java b/src/main/java/com/factoreal/backend/kafka/strategy/alarmList/NotificationStrategy.java new file mode 100644 index 00000000..0f1fd6c8 --- /dev/null +++ b/src/main/java/com/factoreal/backend/kafka/strategy/alarmList/NotificationStrategy.java @@ -0,0 +1,12 @@ +package com.factoreal.backend.kafka.strategy.alarmList; + +import com.factoreal.backend.kafka.strategy.enums.AlarmEventDto; +import com.factoreal.backend.kafka.strategy.enums.RiskLevel; + +public interface NotificationStrategy { + void send(AlarmEventDto alarmEventDto); + // 이 인터페이스를 상속받는 객체가 동작할 최소 위험 레벨을 설정 + // Kafka로 받아온 센서 데이터와 비교하기 위함. + // 센서의 Danger Level보다 같거나 낮은 Strategy들이 반환되기 위함. + RiskLevel getSupportedLevel(); +} diff --git a/src/main/java/com/factoreal/backend/strategy/SmsNotificationStrategy.java b/src/main/java/com/factoreal/backend/kafka/strategy/alarmList/SmsNotificationStrategy.java similarity index 88% rename from src/main/java/com/factoreal/backend/strategy/SmsNotificationStrategy.java rename to src/main/java/com/factoreal/backend/kafka/strategy/alarmList/SmsNotificationStrategy.java index e6283b0d..76c84cf6 100644 --- a/src/main/java/com/factoreal/backend/strategy/SmsNotificationStrategy.java +++ b/src/main/java/com/factoreal/backend/kafka/strategy/alarmList/SmsNotificationStrategy.java @@ -1,8 +1,8 @@ -package com.factoreal.backend.strategy; +package com.factoreal.backend.kafka.strategy.alarmList; -import com.factoreal.backend.strategy.enums.AlarmEventDto; +import com.factoreal.backend.kafka.strategy.enums.AlarmEventDto; import com.factoreal.backend.entity.Worker; -import com.factoreal.backend.strategy.enums.RiskLevel; +import com.factoreal.backend.kafka.strategy.enums.RiskLevel; import com.factoreal.backend.repository.WorkerRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -22,6 +22,7 @@ public class SmsNotificationStrategy implements NotificationStrategy { private final WorkerRepository workerRepository; private static final String userId = "alarm-test"; + // TODO Seoul 리전에 SMS 지원이 안되기에 Slack으로 변경 @Override public void send(AlarmEventDto alarmEventDto) { log.info("📬 SMS Notification Strategy."); diff --git a/src/main/java/com/factoreal/backend/strategy/WebSocketNotificationStrategy.java b/src/main/java/com/factoreal/backend/kafka/strategy/alarmList/WebSocketNotificationStrategy.java similarity index 78% rename from src/main/java/com/factoreal/backend/strategy/WebSocketNotificationStrategy.java rename to src/main/java/com/factoreal/backend/kafka/strategy/alarmList/WebSocketNotificationStrategy.java index 3649a1c9..269da77d 100644 --- a/src/main/java/com/factoreal/backend/strategy/WebSocketNotificationStrategy.java +++ b/src/main/java/com/factoreal/backend/kafka/strategy/alarmList/WebSocketNotificationStrategy.java @@ -1,8 +1,8 @@ -package com.factoreal.backend.strategy; +package com.factoreal.backend.kafka.strategy.alarmList; import com.factoreal.backend.sender.WebSocketSender; -import com.factoreal.backend.strategy.enums.AlarmEventDto; -import com.factoreal.backend.strategy.enums.RiskLevel; +import com.factoreal.backend.kafka.strategy.enums.AlarmEventDto; +import com.factoreal.backend.kafka.strategy.enums.RiskLevel; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -10,11 +10,12 @@ @RequiredArgsConstructor @Component @Slf4j -public class WebSocketNotificationStrategy implements NotificationStrategy{ +public class WebSocketNotificationStrategy implements NotificationStrategy { // SimpMessagingTemplate은 WebSocketConfig.java에 EnableWebSocketMessageBroker 어노테이션에 의해 빈이 등록됨. private final WebSocketSender webSocketSender; private static final String userId = "alarm-test"; + @Override public void send(AlarmEventDto alarmEventDto) { log.info("🌐WebSocket Notification Strategy"); diff --git a/src/main/java/com/factoreal/backend/strategy/DefaultRiskMessageProvider.java b/src/main/java/com/factoreal/backend/kafka/strategy/alarmMessage/DefaultRiskMessageProvider.java similarity index 91% rename from src/main/java/com/factoreal/backend/strategy/DefaultRiskMessageProvider.java rename to src/main/java/com/factoreal/backend/kafka/strategy/alarmMessage/DefaultRiskMessageProvider.java index 3b8cb63e..347deedd 100644 --- a/src/main/java/com/factoreal/backend/strategy/DefaultRiskMessageProvider.java +++ b/src/main/java/com/factoreal/backend/kafka/strategy/alarmMessage/DefaultRiskMessageProvider.java @@ -1,7 +1,7 @@ -package com.factoreal.backend.strategy; +package com.factoreal.backend.kafka.strategy.alarmMessage; -import com.factoreal.backend.strategy.enums.RiskLevel; -import com.factoreal.backend.strategy.enums.SensorType; +import com.factoreal.backend.kafka.strategy.enums.RiskLevel; +import com.factoreal.backend.kafka.strategy.enums.SensorType; import org.springframework.stereotype.Component; @Component diff --git a/src/main/java/com/factoreal/backend/kafka/strategy/alarmMessage/RiskMessageProvider.java b/src/main/java/com/factoreal/backend/kafka/strategy/alarmMessage/RiskMessageProvider.java new file mode 100644 index 00000000..97516636 --- /dev/null +++ b/src/main/java/com/factoreal/backend/kafka/strategy/alarmMessage/RiskMessageProvider.java @@ -0,0 +1,8 @@ +package com.factoreal.backend.kafka.strategy.alarmMessage; + +import com.factoreal.backend.kafka.strategy.enums.RiskLevel; +import com.factoreal.backend.kafka.strategy.enums.SensorType; + +public interface RiskMessageProvider { + String getMessage(SensorType sensorType, RiskLevel riskLevel); +} diff --git a/src/main/java/com/factoreal/backend/strategy/enums/AlarmEventDto.java b/src/main/java/com/factoreal/backend/kafka/strategy/enums/AlarmEventDto.java similarity index 94% rename from src/main/java/com/factoreal/backend/strategy/enums/AlarmEventDto.java rename to src/main/java/com/factoreal/backend/kafka/strategy/enums/AlarmEventDto.java index ad6dccfb..caf29649 100644 --- a/src/main/java/com/factoreal/backend/strategy/enums/AlarmEventDto.java +++ b/src/main/java/com/factoreal/backend/kafka/strategy/enums/AlarmEventDto.java @@ -1,10 +1,8 @@ -package com.factoreal.backend.strategy.enums; +package com.factoreal.backend.kafka.strategy.enums; import lombok.Builder; import lombok.Data; -import java.sql.Timestamp; - /** * Kafka Consumer에서 해당 객체 생성 * 다양한 채널로 발송될 알람의 내용을 담는 표준 이벤트 객체 diff --git a/src/main/java/com/factoreal/backend/strategy/enums/RiskLevel.java b/src/main/java/com/factoreal/backend/kafka/strategy/enums/RiskLevel.java similarity index 94% rename from src/main/java/com/factoreal/backend/strategy/enums/RiskLevel.java rename to src/main/java/com/factoreal/backend/kafka/strategy/enums/RiskLevel.java index 5ddfc042..f0f232bc 100644 --- a/src/main/java/com/factoreal/backend/strategy/enums/RiskLevel.java +++ b/src/main/java/com/factoreal/backend/kafka/strategy/enums/RiskLevel.java @@ -1,4 +1,4 @@ -package com.factoreal.backend.strategy.enums; +package com.factoreal.backend.kafka.strategy.enums; import lombok.Getter; diff --git a/src/main/java/com/factoreal/backend/strategy/enums/SensorType.java b/src/main/java/com/factoreal/backend/kafka/strategy/enums/SensorType.java similarity index 80% rename from src/main/java/com/factoreal/backend/strategy/enums/SensorType.java rename to src/main/java/com/factoreal/backend/kafka/strategy/enums/SensorType.java index fa52940a..e036b306 100644 --- a/src/main/java/com/factoreal/backend/strategy/enums/SensorType.java +++ b/src/main/java/com/factoreal/backend/kafka/strategy/enums/SensorType.java @@ -1,4 +1,4 @@ -package com.factoreal.backend.strategy.enums; +package com.factoreal.backend.kafka.strategy.enums; public enum SensorType { current, diff --git a/src/main/java/com/factoreal/backend/consumer/mqtt/MqttService.java b/src/main/java/com/factoreal/backend/mqtt/MqttService.java similarity index 95% rename from src/main/java/com/factoreal/backend/consumer/mqtt/MqttService.java rename to src/main/java/com/factoreal/backend/mqtt/MqttService.java index c9f55523..c6ca290a 100644 --- a/src/main/java/com/factoreal/backend/consumer/mqtt/MqttService.java +++ b/src/main/java/com/factoreal/backend/mqtt/MqttService.java @@ -1,4 +1,4 @@ -package com.factoreal.backend.consumer.mqtt; +package com.factoreal.backend.mqtt; import com.factoreal.backend.dto.EquipDto; import com.factoreal.backend.dto.SensorDto; @@ -43,6 +43,7 @@ public void SensorShadowSubscription() throws MqttException { // mqtt에서 전달되는 뎁스를 따라가야함 JsonNode reported = jsonNode.at("/current/state/reported"); log.info("📥 MQTT 수신 (topic: {}): {}", t, jsonNode); + String sensorId = reported.at("/sensorId").asText(); String type = reported.at("/type").asText(); String zoneId = reported.at("/zoneId").asText(); @@ -50,7 +51,9 @@ public void SensorShadowSubscription() throws MqttException { String equipIdVal = reported.path("equipId").asText(null); // 키가 없으면 null String equipId = (equipIdVal == null || equipIdVal.isBlank()) ? null : equipIdVal; - SensorDto dto = new SensorDto(sensorId, type , zoneId, equipId, null, null); + Integer iszone = equipId.equals(zoneId) ? 1 : 0; + + SensorDto dto = new SensorDto(sensorId, type , zoneId, equipId, null, null, iszone); sensorService.saveSensor(dto); // 중복이면 예외 발생 log.info("✅ 센서 저장 완료: {}", sensorId); } catch (DataIntegrityViolationException e) { diff --git a/src/main/java/com/factoreal/backend/producer/kafka/KafkaProducer.java b/src/main/java/com/factoreal/backend/producer/kafka/KafkaProducer.java deleted file mode 100644 index f370fc32..00000000 --- a/src/main/java/com/factoreal/backend/producer/kafka/KafkaProducer.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.factoreal.backend.producer.kafka; - -import com.factoreal.backend.strategy.enums.AlarmEventDto; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.RequiredArgsConstructor; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Component; - -@Component -@RequiredArgsConstructor -public class KafkaProducer { - private final KafkaTemplate kafkaTemplate; - private final ObjectMapper objectMapper; - - public void sendAlarmEvent(String topic, AlarmEventDto event) { - try { - String message = objectMapper.writeValueAsString(event); - kafkaTemplate.send(topic, message); - } catch (Exception e) { - throw new RuntimeException("Failed to serialize AlarmEvent", e); - } - } -} diff --git a/src/main/java/com/factoreal/backend/sender/WebSocketSender.java b/src/main/java/com/factoreal/backend/sender/WebSocketSender.java index e8262f8f..b57e485d 100644 --- a/src/main/java/com/factoreal/backend/sender/WebSocketSender.java +++ b/src/main/java/com/factoreal/backend/sender/WebSocketSender.java @@ -2,7 +2,7 @@ import com.factoreal.backend.dto.SystemLogDto; import com.factoreal.backend.dto.ZoneDangerDto; -import com.factoreal.backend.strategy.enums.AlarmEventDto; +import com.factoreal.backend.kafka.strategy.enums.AlarmEventDto; import lombok.RequiredArgsConstructor; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Component; @@ -22,7 +22,7 @@ public void sendDangerLevel(String zoneId, String sensorType, int level) { } /** - * 시스템 로그를 WebSocket으로 전송 + * Todo : 시스템 로그를 WebSocket으로 전송 -> restAPI 변경으로 삭제 예정 */ public void sendSystemLog(SystemLogDto logDto) { messagingTemplate.convertAndSend("/topic/system-log", logDto); @@ -35,6 +35,9 @@ public void sendDangerAlarm(AlarmEventDto alarmEventDto) { messagingTemplate.convertAndSend("/topic/alarm", alarmEventDto); } + /** + * 읽지 않은 알람수 전송 + */ public void sendUnreadCount(long count){ messagingTemplate.convertAndSend("/topic/unread-count", count); } diff --git a/src/main/java/com/factoreal/backend/service/AbnormalLogService.java b/src/main/java/com/factoreal/backend/service/AbnormalLogService.java index 7e49f5ce..d832a110 100644 --- a/src/main/java/com/factoreal/backend/service/AbnormalLogService.java +++ b/src/main/java/com/factoreal/backend/service/AbnormalLogService.java @@ -8,17 +8,19 @@ import com.factoreal.backend.entity.Zone; import com.factoreal.backend.repository.AbnLogRepository; import com.factoreal.backend.sender.WebSocketSender; -import com.factoreal.backend.strategy.RiskMessageProvider; -import com.factoreal.backend.strategy.enums.RiskLevel; -import com.factoreal.backend.strategy.enums.SensorType; +import com.factoreal.backend.kafka.strategy.alarmMessage.RiskMessageProvider; +import com.factoreal.backend.kafka.strategy.enums.RiskLevel; +import com.factoreal.backend.kafka.strategy.enums.SensorType; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; +import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.server.ResponseStatusException; import java.time.LocalDateTime; @@ -35,13 +37,20 @@ public class AbnormalLogService { // 알람 객체를 받아와서 로그 객체 생성. @Transactional(rollbackFor = Exception.class) public AbnormalLog saveAbnormalLogFromKafkaDto( - SensorKafkaDto sensorKafkaDto, - SensorType sensorType, - RiskLevel riskLevel, - LogType targetType - ) throws Exception{ + SensorKafkaDto sensorKafkaDto, + SensorType sensorType, + RiskLevel riskLevel, + LogType targetType + ) throws Exception{ + + Zone zone = zoneService.getZone(sensorKafkaDto.getZoneId()); + + if (zone == null) { + throw new ResponseStatusException(HttpStatus.NOT_FOUND, "존재하지 않는 공간 ID: " + sensorKafkaDto.getZoneId()); + } + log.info(">>>>>> zone : {} " ,zone); // DTO의 severity (AlarmEvent.RiskLevel)를 Entity RiskLevel로 매핑 @@ -146,7 +155,7 @@ public boolean readCheck(Long abnormalLogId){ // 읽지 않은 알람이 몇개인지 반환 public Long readRequired(){ Long count = abnLogRepository.countByIsReadFalse(); - webSocketSender.sendUnreadCount(count); +// webSocketSender.sendUnreadCount(count); return count; } diff --git a/src/main/java/com/factoreal/backend/service/AlarmEventService.java b/src/main/java/com/factoreal/backend/service/AlarmEventService.java new file mode 100644 index 00000000..206c703f --- /dev/null +++ b/src/main/java/com/factoreal/backend/service/AlarmEventService.java @@ -0,0 +1,90 @@ +package com.factoreal.backend.service; + +import com.factoreal.backend.dto.SensorKafkaDto; +import com.factoreal.backend.entity.AbnormalLog; +import com.factoreal.backend.kafka.strategy.alarmList.NotificationStrategy; +import com.factoreal.backend.kafka.strategy.NotificationStrategyFactory; +import com.factoreal.backend.kafka.strategy.enums.AlarmEventDto; +import com.factoreal.backend.kafka.strategy.enums.RiskLevel; +import com.factoreal.backend.kafka.strategy.enums.SensorType; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.List; + +@Service +@Slf4j +@RequiredArgsConstructor +public class AlarmEventService { + + // 위험 레벨별 알람 전략을 가져오기 위한 팩토리 서비스 + private final NotificationStrategyFactory notificationStrategyFactory; + private final ZoneService zoneService; + // Todo 추후 Flink에서 SensorKafkaDto에 dangerLevel을 포함하면 제거 + public void startAlarm(SensorKafkaDto sensorData, AbnormalLog abnormalLog, int dangerLevel) { + AlarmEventDto alarmEventDto; + RiskLevel riskLevel = RiskLevel.fromPriority(dangerLevel); + // Todo Flink 수정시 주석 해제 + // RiskLevel riskLevel = RiskLevel.fromPriority(sensorData.getDangerLevel); + + try { + // 1. dangerLevel기준으로 alarmEvent 객체 생성. + alarmEventDto = generateAlarmDto(sensorData, abnormalLog, riskLevel); + } catch (Exception e) { + log.error("Error converting Kafka message: {}", e); + return; + } + // 1-1. AbnormalLog 기록. + try { + // 2. 생성된 AlarmEvent DTO 객체를 사용하여 알람 처리 + log.info("alarmEvent: {}", alarmEventDto.toString()); + processAlarmEvent(alarmEventDto); + } catch (Exception e) { + log.error("Error converting Kafka message: {}", e); + // TODO: 기타 처리 오류 처리 + } + } + private AlarmEventDto generateAlarmDto(SensorKafkaDto data, AbnormalLog abnormalLog, RiskLevel riskLevel) + throws Exception { + + String source = data.getZoneId().equals(data.getEquipId()) ? "공간 센서" : "설비 센서"; + SensorType sensorType = SensorType.valueOf(data.getSensorType()); + String zoneName = zoneService.getZone(data.getZoneId()).getZoneName(); + // 알람 이벤트 객체 반환 + return AlarmEventDto.builder() + .eventId(abnormalLog.getId()) + .sensorId(data.getSensorId()) + .equipId(data.getEquipId()) + .zoneId(data.getZoneId()) + .sensorType(sensorType.name()) + .messageBody(abnormalLog.getAbnormalType()) + .source(source) + .time(data.getTime()) + .riskLevel(riskLevel) + .zoneName(zoneName) + .build(); + } + private void processAlarmEvent(AlarmEventDto alarmEventDto) { + if (alarmEventDto == null || alarmEventDto.getRiskLevel() == null) { + log.warn("Received null AlarmEvent DTO or DTO with null severity. Skipping notification."); + return; + } + + try { + log.info("Processing AlarmEvent with mapped Entity RiskLevel: {}", alarmEventDto.getRiskLevel()); + + // 3. Factory를 사용하여 매핑된 Entity RiskLevel에 해당하는 NotificationStrategy를 가져와 실행 + List notificationStrategyList = + notificationStrategyFactory.getStrategiesForLevel(alarmEventDto.getRiskLevel()); + + log.info("💡Notification strategy executed for AlarmEvent. \n{}", alarmEventDto.toString()); + // 4. 알람 객체의 값으로 전략별 알람 송신. + notificationStrategyList.forEach(notificationStrategy -> notificationStrategy.send(alarmEventDto)); + + } catch (Exception e) { + log.error("Failed to execute notification strategy for AlarmEvent DTO: {}", alarmEventDto, e); + // TODO: 전략 실행 중 오류 처리 + } + } +} diff --git a/src/main/java/com/factoreal/backend/service/AutoControlService.java b/src/main/java/com/factoreal/backend/service/AutoControlService.java new file mode 100644 index 00000000..09fafc20 --- /dev/null +++ b/src/main/java/com/factoreal/backend/service/AutoControlService.java @@ -0,0 +1,60 @@ +package com.factoreal.backend.service; + +import com.factoreal.backend.dto.SensorKafkaDto; +import com.factoreal.backend.entity.Sensor; +import com.factoreal.backend.repository.SensorRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** + * [공간센서제어] + * 공간 센서 측정값이 임계치를 벗어났는지 판단하여 자동 제어가 필요한 상황을 감지 + */ +@Service +@RequiredArgsConstructor +@Slf4j +public class AutoControlService { + + private final SensorRepository sensorRepository; + + /** + * 센서 값이 허용 범위를 벗어났을 경우 제어 메시지를 생성하거나 처리하도록 로깅 + */ + public void evaluate(SensorKafkaDto dto, int dangerLevel) { + if (dangerLevel == 0) return; // 정상 범위면 아무 처리 안 함 + + Sensor sensor = sensorRepository.findById(dto.getSensorId()) + .orElse(null); + + if (sensor == null) { + log.warn("❌ 센서 정보 조회 실패: sensorId={}", dto.getSensorId()); + return; + } + + double threshold = sensor.getSensorThres(); + double tolerance = sensor.getAllowVal() != null ? sensor.getAllowVal() : 0.0; + double value = dto.getVal(); + + if (value < threshold - tolerance || value > threshold + tolerance) { + String message = buildControlMessage(sensor.getSensorType().name(), value, threshold, tolerance); + log.info("⚙️ 자동제어 필요: {}", message); + // TODO: MQTT 퍼블리시 로직으로 대체 + } else { + log.info("✅ 측정값은 허용 범위 내: sensorId={}, value={}", dto.getSensorId(), value); + } + } + + + // 제어 로직 + private String buildControlMessage(String type, double val, double thresh, double tol) { + return switch (type.toLowerCase()) { + case "temp" -> String.format("현재 온도 %.1f℃, 적정 범위: %.1f~%.1f℃", val, thresh - tol, thresh + tol); + case "humid" -> String.format("현재 습도 %.1f%%, 적정 범위: %.1f~%.1f%%", val, thresh - tol, thresh + tol); + case "vibration" -> String.format("현재 진동 %.1fmm/s, 허용 범위: %.1f~%.1fmm/s", val, thresh - tol, thresh + tol); + case "current" -> String.format("현재 전류 %.1fmA, 허용 범위: %.1f~%.1fmA", val, thresh - tol, thresh + tol); + case "dust" -> String.format("현재 미세먼지 %.1f㎍/㎥, 허용 범위: %.1f~%.1f㎍/㎥", val, thresh - tol, thresh + tol); + default -> String.format("현재 값 %.1f, 허용 범위: %.1f~%.1f", val, thresh - tol, thresh + tol); + }; + } +} diff --git a/src/main/java/com/factoreal/backend/service/SensorService.java b/src/main/java/com/factoreal/backend/service/SensorService.java index e04794de..3e78d1d4 100644 --- a/src/main/java/com/factoreal/backend/service/SensorService.java +++ b/src/main/java/com/factoreal/backend/service/SensorService.java @@ -4,7 +4,7 @@ import java.util.Optional; import java.util.stream.Collectors; -import com.factoreal.backend.strategy.enums.SensorType; +import com.factoreal.backend.kafka.strategy.enums.SensorType; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -52,6 +52,7 @@ public Sensor saveSensor(SensorDto dto) { sens.setSensorType(SensorType.valueOf(dto.getSensorType())); sens.setZone(zone); sens.setEquip(equip.get()); + sens.setIsZone(dto.getIsZone()); return repo.save(sens); } @@ -64,7 +65,8 @@ public List getAllSensors() { s.getZone().getZoneId(), s.getEquip().getEquipId(), s.getSensorThres(), - s.getAllowVal() + s.getAllowVal(), + s.getIsZone() )) .collect(Collectors.toList()); } diff --git a/src/main/java/com/factoreal/backend/strategy/NotificationStrategy.java b/src/main/java/com/factoreal/backend/strategy/NotificationStrategy.java deleted file mode 100644 index c418898b..00000000 --- a/src/main/java/com/factoreal/backend/strategy/NotificationStrategy.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.factoreal.backend.strategy; - -import com.factoreal.backend.strategy.enums.AlarmEventDto; -import com.factoreal.backend.strategy.enums.RiskLevel; - -public interface NotificationStrategy { - void send(AlarmEventDto alarmEventDto); - // 이 인터페이스를 상속받는 객체가 동작할 위험 레벨을 설정. - RiskLevel getSupportedLevel(); -} diff --git a/src/main/java/com/factoreal/backend/strategy/RiskMessageProvider.java b/src/main/java/com/factoreal/backend/strategy/RiskMessageProvider.java deleted file mode 100644 index 1c370437..00000000 --- a/src/main/java/com/factoreal/backend/strategy/RiskMessageProvider.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.factoreal.backend.strategy; - -import com.factoreal.backend.strategy.enums.RiskLevel; -import com.factoreal.backend.strategy.enums.SensorType; - -public interface RiskMessageProvider { - String getMessage(SensorType sensorType, RiskLevel riskLevel); -} diff --git a/src/main/java/com/factoreal/backend/strategy/WebPushNotificationStrategy.java b/src/main/java/com/factoreal/backend/strategy/WebPushNotificationStrategy.java deleted file mode 100644 index d3d819be..00000000 --- a/src/main/java/com/factoreal/backend/strategy/WebPushNotificationStrategy.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.factoreal.backend.strategy; - -import com.factoreal.backend.strategy.enums.AlarmEventDto; -import com.factoreal.backend.strategy.enums.RiskLevel; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -@Slf4j -@Component("WEB") -// https://stir.tistory.com/516를 참고하여 구현 -public class WebPushNotificationStrategy implements NotificationStrategy { - @Override - public void send(AlarmEventDto alarmEventDto) { - log.info("🍿 Web Push Notification Strategy"); - } - - @Override - public RiskLevel getSupportedLevel() { - return RiskLevel.INFO; - } -} diff --git a/src/main/java/com/factoreal/backend/strategy/enums/AlarmType.java b/src/main/java/com/factoreal/backend/strategy/enums/AlarmType.java deleted file mode 100644 index 5020fc8f..00000000 --- a/src/main/java/com/factoreal/backend/strategy/enums/AlarmType.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.factoreal.backend.strategy.enums; - -import lombok.Getter; - -@Getter -public enum AlarmType { - HIGH_TEMP("온도 이상", "온도가 기준치를 초과했습니다."), - HIGH_DUST("먼지 농도 이상", "먼지 농도가 기준치를 초과했습니다."), - HIGH_VIBRATION("진동 감지", "진동이 감지되었습니다."), - LOW_HUMIDITY("습도 낮음", "습도가 기준치 이하입니다."), - VOC_DETECTED("유해가스 감지", "VOC(휘발성 유기화합물)가 감지되었습니다."), - OVER_CURRENT("전류 초과", "기기의 전류 사용량이 비정상적입니다."); - - private final String title; - private final String message; - - AlarmType(String title, String message) { - this.title = title; - this.message = message; - } - -} diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml index 3c49606d..f1aef477 100644 --- a/src/main/resources/application-local.yml +++ b/src/main/resources/application-local.yml @@ -5,7 +5,7 @@ spring: on-profile: local kafka: - bootstrap-servers: localhost:9092 + bootstrap-servers: ${KAFKA_HOST}:9092 consumer: group-id: default-consumer-group-1