Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/java/com/factoreal/backend/dto/SensorDto.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class SensorDto { // BE -> FE ์šฉ DTO
private String equipId;
private Double sensorThres; // ์ž„๊ณ„์น˜
private Double allowVal; // ํ—ˆ์šฉ์น˜
private Integer isZone;

public static SensorDto fromEntity(Sensor sensor) {
if (sensor == null) return null;
Expand All @@ -26,6 +27,7 @@ public static SensorDto fromEntity(Sensor sensor) {
.equipId(sensor.getEquip().getEquipId())
.sensorThres(sensor.getSensorThres())
.allowVal(sensor.getAllowVal())
.isZone(sensor.getIsZone())
.build();
}
}
18 changes: 11 additions & 7 deletions src/main/java/com/factoreal/backend/dto/SensorKafkaDto.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
@Setter
@NoArgsConstructor
@AllArgsConstructor
// Kafka mapper์šฉ DTO
/**
* Kafka Sensor ๋ฐ์ดํ„ฐ mapper์šฉ DTO
*/
public class SensorKafkaDto {
private String zoneId;
private String equipId;
private String sensorId;
private String sensorType;
private Double val; // ์ธก์ •๊ฐ’ ๋‹จ์œ„
private String time; // ์„ผ์„œ ์ƒ์„ฑ์‹œ๊ฐ„
private String zoneId; // ๊ณต๊ฐ„ ID
private String equipId; // ์„ค๋น„ ID
private String sensorId; // ์„ผ์„œ ID
private String sensorType; // ์„ผ์„œ ํƒ€์ž…
private Double val; // ์„ผ์„œ ์ธก์ •๊ฐ’
private String Category; // ์„ผ์„œ ์นดํ…Œ๊ณ ๋ฆฌ {ENVIRONMENT, EQUIPMENT}
private String time; // ์„ผ์„œ ํƒ์ง€ ์‹œ๊ฐ„
private int dangerLevel; // ์œ„ํ—˜๋„
}
2 changes: 1 addition & 1 deletion src/main/java/com/factoreal/backend/entity/Sensor.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.factoreal.backend.entity;

import com.factoreal.backend.strategy.enums.SensorType;
import com.factoreal.backend.kafka.strategy.enums.SensorType;
import jakarta.persistence.*;
import lombok.*;
import java.time.LocalDateTime;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
package com.factoreal.backend.consumer.kafka;
package com.factoreal.backend.kafka;

import com.factoreal.backend.dto.SensorKafkaDto;
import com.factoreal.backend.dto.abnormalLog.LogType;
import com.factoreal.backend.entity.AbnormalLog;
import com.factoreal.backend.dto.SystemLogDto;
import com.factoreal.backend.entity.Zone;
import com.factoreal.backend.sender.WebSocketSender;
import com.factoreal.backend.service.ZoneService;
import com.factoreal.backend.entity.AbnormalLog;
import com.factoreal.backend.service.AbnormalLogService;
import com.factoreal.backend.strategy.NotificationStrategy;
import com.factoreal.backend.strategy.NotificationStrategyFactory;
import com.factoreal.backend.strategy.enums.AlarmEventDto;
import com.factoreal.backend.strategy.enums.RiskLevel;
import com.factoreal.backend.strategy.enums.SensorType;
import com.factoreal.backend.kafka.strategy.alarmList.NotificationStrategy;
import com.factoreal.backend.kafka.strategy.NotificationStrategyFactory;
import com.factoreal.backend.kafka.strategy.enums.AlarmEventDto;
import com.factoreal.backend.kafka.strategy.enums.RiskLevel;
import com.factoreal.backend.kafka.strategy.enums.SensorType;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
Expand All @@ -22,7 +20,6 @@
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import com.factoreal.backend.service.SensorService;
Expand All @@ -31,19 +28,15 @@
import java.time.ZoneId;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import java.util.Objects;

@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaConsumer {
public class KafkaConsumerD {

private final ObjectMapper objectMapper;
private final WebSocketSender webSocketSender;
Expand All @@ -69,8 +62,8 @@ public class KafkaConsumer {

// @KafkaListener(topics = {"EQUIPMENT", "ENVIRONMENT"}, groupId =
// "monitory-consumer-group-1")
@KafkaListener(topics = { "EQUIPMENT",
"ENVIRONMENT" }, groupId = "${spring.kafka.consumer.group-id:danger-alert-group}")
// @KafkaListener(topics = { "EQUIPMENT",
// "ENVIRONMENT" }, groupId = "${spring.kafka.consumer.group-id:danger-alert-group}")
public void consume(String message) {

log.info("๐Ÿ’ก์ˆ˜์‹ ํ•œ Kafka ๋ฉ”์‹œ์ง€ : " + message);
Expand All @@ -87,6 +80,7 @@ public void consume(String message) {
// ๋น„๋™๊ธฐ ES ์ €์žฅ
// #################################
saveToElasticsearch(dto);

log.info("โ–ถ๏ธŽ ์œ„ํ—˜๋„ ๊ฐ์ง€ start");
int dangerLevel = getDangerLevel(dto.getSensorType(), dto.getVal());
log.info("โš ๏ธ ์œ„ํ—˜๋„ {} ์„ผ์„œ ํƒ€์ž… : {} ๊ฐ์ง€๋จ. Zone: {}", dangerLevel, dto.getSensorType(), dto.getZoneId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.factoreal.backend.kafka.consumer;

import com.factoreal.backend.dto.SensorKafkaDto;
import com.factoreal.backend.kafka.processor.SensorEventProcessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;


/**
* KafkaConsumer ํด๋ž˜์Šค๋Š” Kafka ํ† ํ”ฝ์œผ๋กœ๋ถ€ํ„ฐ ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜์‹ ํ•˜๊ณ ,
* ํ•ด๋‹น ๋ฉ”์‹œ์ง€๋ฅผ SensorEventProcessor.java ์ „๋‹ฌ
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumer {

private final ObjectMapper objectMapper;
private final SensorEventProcessor sensorEventProcessor;

// ์„ค๋น„ ์„ผ์„œ ๊ด€๋ จ Kafka ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ
// Todo : ์„ค๋น„ ๋จธ์‹ ๋Ÿฌ๋‹ ๋๋‚˜๊ณ  ์ˆ˜์ • ์˜ˆ์ •
// @KafkaListener(topics = "EQUIPMENT", groupId = "equipment-consumer-group")
public void consumeEquipment(String message) {
log.info("๐Ÿ“ฉ [EQUIPMENT] Kafka ๋ฉ”์‹œ์ง€ ์ˆ˜์‹ : {}", message);
handleMessage(message, "EQUIPMENT");
}

// ๊ณต๊ฐ„ ์„ผ์„œ ๊ด€๋ จ Kafka ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ
@KafkaListener(topics = "ENVIRONMENT", groupId = "environment-consumer-group")
public void consumeEnvironment(String message) {
log.info("๐Ÿ“ฉ [ENVIRONMENT] Kafka ๋ฉ”์‹œ์ง€ ์ˆ˜์‹ : {}", message);
handleMessage(message, "ENVIRONMENT");
}

// ๊ณตํ†ต ๋ฉ”์‹œ์ง€ ํŒŒ์‹ฑ ๋ฐ ์ฒ˜๋ฆฌ ์ „๋‹ฌ
private void handleMessage(String message, String topic) {
try {
SensorKafkaDto dto = objectMapper.readValue(message, SensorKafkaDto.class);
log.info("โœ… Kafka ๋ฉ”์‹œ์ง€ ํŒŒ์‹ฑ ์™„๋ฃŒ: sensorId={}, zoneId={}, val={}",
dto.getSensorId(), dto.getZoneId(), dto.getVal());
sensorEventProcessor.process(dto, topic);
log.info("โœ… Kafka ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์œ„์ž„ ์™„๋ฃŒ: topic={}", topic);
} catch (Exception e) {
log.error("โŒ Kafka ๋ฉ”์‹œ์ง€ ํŒŒ์‹ฑ ๋˜๋Š” ์ฒ˜๋ฆฌ ์‹คํŒจ: {}", message, e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.factoreal.backend.kafka.processor;

import com.factoreal.backend.dto.SensorKafkaDto;
import com.factoreal.backend.dto.abnormalLog.LogType;
import com.factoreal.backend.entity.AbnormalLog;
import com.factoreal.backend.sender.WebSocketSender;
import com.factoreal.backend.service.AbnormalLogService;
import com.factoreal.backend.service.AlarmEventService;
import com.factoreal.backend.service.AutoControlService;
import com.factoreal.backend.kafka.strategy.enums.RiskLevel;
import com.factoreal.backend.kafka.strategy.enums.SensorType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

/**
* SensorEventProcessor ํด๋ž˜์Šค๋Š” Kafka๋กœ๋ถ€ํ„ฐ ์ „๋‹ฌ๋ฐ›์€ ์„ผ์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ถ„์„ ๋ฐ ์ฒ˜๋ฆฌํ•˜๋Š” ํด๋ž˜์Šค์ž…๋‹ˆ๋‹ค.
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class SensorEventProcessor {

private final AutoControlService autoControlService;
private final AbnormalLogService abnormalLogService;
private final AlarmEventService alarmEventService;
private final WebSocketSender webSocketSender;

/**
* ์„ผ์„œ Kafka ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ
* @param dto ์„ผ์„œ ๋ฐ์ดํ„ฐ
* @param topic Kafka ํ† ํ”ฝ๋ช… (EQUIPMENT, ENVIRONMENT)
*/
public void process(SensorKafkaDto dto, String topic) {
try {

// ์œ ํšจ์„ฑ ๊ฒ€์‚ฌ: zoneId์™€ sensorId๋Š” ํ•„์ˆ˜
if (dto.getZoneId() == null || dto.getZoneId().isBlank()) {
log.warn("โš ๏ธ ์œ ํšจํ•˜์ง€ ์•Š์€ zoneId: null ๋˜๋Š” ๋นˆ ๋ฌธ์ž์—ด");
return;
}
if (dto.getSensorId() == null || dto.getSensorId().isBlank()) {
log.warn("โš ๏ธ ์œ ํšจํ•˜์ง€ ์•Š์€ sensorId: null ๋˜๋Š” ๋นˆ ๋ฌธ์ž์—ด");
return;
}

// ENVIRONMENT ํ† ํ”ฝ์ธ ๊ฒฝ์šฐ์—๋งŒ ์•„๋ž˜ ์ฒ˜๋ฆฌ ๋กœ์ง ์ˆ˜ํ–‰
if ("ENVIRONMENT".equalsIgnoreCase(topic)) {
if (dto.getEquipId() == null || !dto.getEquipId().equals(dto.getZoneId())) {
log.warn("โš ๏ธ ENVIRONMENT ํ† ํ”ฝ์ด์ง€๋งŒ equipId์™€ zoneId๊ฐ€ ์ผ์น˜ํ•˜์ง€ ์•Š์Œ: equipId={}, zoneId={}",
dto.getEquipId(), dto.getZoneId());
return;
}

// ์œ„ํ—˜๋„ ๊ณ„์‚ฐ
int dangerLevel = dto.getDangerLevel();
SensorType sensorType = SensorType.getSensorType(dto.getSensorType());
RiskLevel riskLevel = RiskLevel.fromPriority(dangerLevel);
LogType logType = topicToLogType(topic);

// ์ž๋™ ์ œ์–ด ๋ฉ”์‹œ์ง€ ํŒ๋‹จ (Todo - ์ง„ํ–‰์ค‘)
autoControlService.evaluate(dto, dangerLevel);

// ์ด์ƒ ๋กœ๊ทธ ์ €์žฅ
AbnormalLog abnLog = abnormalLogService.saveAbnormalLogFromKafkaDto(
dto, sensorType, riskLevel, logType);


// ์ฝ์ง€ ์•Š์€ ์•Œ๋ฆผ ์ˆ˜ ์กฐํšŒ
Long count = abnormalLogService.readRequired();

// WebSocket ์•Œ๋ฆผ ์ „์†ก
// 1. ํžˆํŠธ๋งต ์ „์†ก
webSocketSender.sendDangerLevel(dto.getZoneId(), dto.getSensorType(), dangerLevel);
// 2. ์œ„ํ—˜ ์•Œ๋ฆผ ์ „์†ก -> ์œ„ํ—˜๋„๋ณ„ Websocket + wearable + Slack(SMS ๋Œ€์ฒด)
// Todo : (As-is) ์ „๋žต ๊ธฐ๋ฐ˜ startAlarm() ๋ฉ”์„œ๋“œ ๋‹ด๋‹น์ž ํ™•์ธ ํ•„์š”
// webSocketSender.sendDangerAlarm(abnLog.toAlarmEventDto());
alarmEventService.startAlarm(dto,abnLog,dangerLevel);
// 3. ์ฝ์ง€ ์•Š์€ ์ˆ˜ ์ „์†ก
webSocketSender.sendUnreadCount(count);



log.info("โœ… ์„ผ์„œ ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ ์™„๋ฃŒ: sensorId={}, zoneId={}, level={} ({} topic)",
dto.getSensorId(), dto.getZoneId(), dangerLevel, topic);
}
} catch (Exception e) {
log.error("โŒ ์„ผ์„œ ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ ์‹คํŒจ: sensorId={}, zoneId={}", dto.getSensorId(), dto.getZoneId(), e);
}
}

// ๊ณต๊ฐ„์— ์œ„ํ—˜๋„ ๋ถ„๊ธฐ ๋กœ์ง
// Todo : Flink์—์„œ ์ ์šฉ์œผ๋กœ ๋ณ€๊ฒฝ๋˜์–ด ์‚ญ์ œ ์˜ˆ์ •
private int getDangerLevel(String sensorType, double val) {
switch (sensorType.toLowerCase()) {
case "temp": return val > 40 || val < -35 ? 2 : (val > 30 || val < 25 ? 1 : 0);
case "humid": return val >= 80 ? 2 : (val >= 60 ? 1 : 0);
case "vibration": return val > 7.1 ? 2 : (val > 2.8 ? 1 : 0);
case "current": return val >= 30 ? 2 : (val >= 7 ? 1 : 0);
case "dust": return val >= 150 ? 2 : (val >= 75 ? 1 : 0);
default: return 0;
}
}

// topic enum ๋ณ€๊ฒฝํ•˜๊ธฐ
private LogType topicToLogType(String topic) {
return switch (topic.toUpperCase()) {
case "EQUIPMENT" -> LogType.Equip;
case "ENVIRONMENT" -> LogType.Sensor;
default -> throw new IllegalArgumentException("์ง€์›ํ•˜์ง€ ์•Š๋Š” Kafka ํ† ํ”ฝ: " + topic);
};
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.factoreal.backend.strategy;
package com.factoreal.backend.kafka.strategy;

import com.factoreal.backend.strategy.enums.RiskLevel;
import com.factoreal.backend.kafka.strategy.alarmList.NotificationStrategy;
import com.factoreal.backend.kafka.strategy.enums.RiskLevel;
import org.springframework.stereotype.Component;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.factoreal.backend.strategy;
package com.factoreal.backend.kafka.strategy.alarmList;

import com.factoreal.backend.strategy.enums.AlarmEventDto;
import com.factoreal.backend.strategy.enums.RiskLevel;
import com.factoreal.backend.kafka.strategy.enums.AlarmEventDto;
import com.factoreal.backend.kafka.strategy.enums.RiskLevel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component("APP")
public class AppPushNotificationStrategy implements NotificationStrategy {


// TODO FCM ์ „์†ก ๋กœ์ง
@Override
public void send(AlarmEventDto alarmEventDto) {
log.info("๐Ÿ“ฒ App Push Notification Strategy.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.factoreal.backend.kafka.strategy.alarmList;

import com.factoreal.backend.kafka.strategy.enums.AlarmEventDto;
import com.factoreal.backend.kafka.strategy.enums.RiskLevel;

public interface NotificationStrategy {
void send(AlarmEventDto alarmEventDto);
// ์ด ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ์ƒ์†๋ฐ›๋Š” ๊ฐ์ฒด๊ฐ€ ๋™์ž‘ํ•  ์ตœ์†Œ ์œ„ํ—˜ ๋ ˆ๋ฒจ์„ ์„ค์ •
// Kafka๋กœ ๋ฐ›์•„์˜จ ์„ผ์„œ ๋ฐ์ดํ„ฐ์™€ ๋น„๊ตํ•˜๊ธฐ ์œ„ํ•จ.
// ์„ผ์„œ์˜ Danger Level๋ณด๋‹ค ๊ฐ™๊ฑฐ๋‚˜ ๋‚ฎ์€ Strategy๋“ค์ด ๋ฐ˜ํ™˜๋˜๊ธฐ ์œ„ํ•จ.
RiskLevel getSupportedLevel();
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.factoreal.backend.strategy;
package com.factoreal.backend.kafka.strategy.alarmList;

import com.factoreal.backend.strategy.enums.AlarmEventDto;
import com.factoreal.backend.kafka.strategy.enums.AlarmEventDto;
import com.factoreal.backend.entity.Worker;
import com.factoreal.backend.strategy.enums.RiskLevel;
import com.factoreal.backend.kafka.strategy.enums.RiskLevel;
import com.factoreal.backend.repository.WorkerRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -22,6 +22,7 @@ public class SmsNotificationStrategy implements NotificationStrategy {
private final WorkerRepository workerRepository;

private static final String userId = "alarm-test";
// TODO Seoul ๋ฆฌ์ „์— SMS ์ง€์›์ด ์•ˆ๋˜๊ธฐ์— Slack์œผ๋กœ ๋ณ€๊ฒฝ
@Override
public void send(AlarmEventDto alarmEventDto) {
log.info("๐Ÿ“ฌ SMS Notification Strategy.");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package com.factoreal.backend.strategy;
package com.factoreal.backend.kafka.strategy.alarmList;

import com.factoreal.backend.sender.WebSocketSender;
import com.factoreal.backend.strategy.enums.AlarmEventDto;
import com.factoreal.backend.strategy.enums.RiskLevel;
import com.factoreal.backend.kafka.strategy.enums.AlarmEventDto;
import com.factoreal.backend.kafka.strategy.enums.RiskLevel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@RequiredArgsConstructor
@Component
@Slf4j
public class WebSocketNotificationStrategy implements NotificationStrategy{
public class WebSocketNotificationStrategy implements NotificationStrategy {
// SimpMessagingTemplate์€ WebSocketConfig.java์— EnableWebSocketMessageBroker ์–ด๋…ธํ…Œ์ด์…˜์— ์˜ํ•ด ๋นˆ์ด ๋“ฑ๋ก๋จ.
private final WebSocketSender webSocketSender;

private static final String userId = "alarm-test";

@Override
public void send(AlarmEventDto alarmEventDto) {
log.info("๐ŸŒWebSocket Notification Strategy");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.factoreal.backend.strategy;
package com.factoreal.backend.kafka.strategy.alarmMessage;

import com.factoreal.backend.strategy.enums.RiskLevel;
import com.factoreal.backend.strategy.enums.SensorType;
import com.factoreal.backend.kafka.strategy.enums.RiskLevel;
import com.factoreal.backend.kafka.strategy.enums.SensorType;
import org.springframework.stereotype.Component;

@Component
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.factoreal.backend.kafka.strategy.alarmMessage;

import com.factoreal.backend.kafka.strategy.enums.RiskLevel;
import com.factoreal.backend.kafka.strategy.enums.SensorType;

public interface RiskMessageProvider {
String getMessage(SensorType sensorType, RiskLevel riskLevel);
}
Loading