diff --git a/.vscode/launch.json b/.vscode/launch.json index 40cda35..a75cef1 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -2,13 +2,19 @@ "version": "0.2.0", "configurations": [ { - "name": "Python: Module Debug", + "name": "Streamlit: Debug", "type": "debugpy", "request": "launch", - "module": "service.simulation.simulateTest", // 실행할 모듈 경로 - "cwd": "${workspaceFolder}", // 작업 디렉토리 - "console": "integratedTerminal", // 통합 터미널에서 실행 - "justMyCode": false // 외부 라이브러리 코드도 디버깅 + "module": "streamlit", // Streamlit 모듈 실행 + "args": [ + "run", "${workspaceFolder}/streamlit_app/app.py" // Streamlit 앱의 진입점 + ], + "console": "integratedTerminal", // 통합 터미널에서 실행 + "cwd": "${workspaceFolder}", // 작업 디렉토리 + "env": { + "PYTHONPATH": "${workspaceFolder}" // PYTHONPATH 설정 + }, + "justMyCode": false // 외부 라이브러리 코드도 디버깅 } ] } \ No newline at end of file diff --git a/mqtt_util/publish.py b/mqtt_util/publish.py index 912a8dc..99a6595 100644 --- a/mqtt_util/publish.py +++ b/mqtt_util/publish.py @@ -52,7 +52,7 @@ def publish(self , topic, payload, qos): topic = topic, payload = payload, qos = qos) - print(f"Published: {payload} to topic: {topic}") + # print(f"Published: {payload} to topic: {topic}") def subscribe(self, topic, qos, callback): print(f"topic Subscribe: {topic} ") diff --git a/service/__init__.py b/service/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/service/sensor/RealSensor.py b/service/sensor/RealSensor.py new file mode 100644 index 0000000..e37c7e3 --- /dev/null +++ b/service/sensor/RealSensor.py @@ -0,0 +1,94 @@ +# real_sensor_publishers.py +import threading, serial, time, json +from awscrt import mqtt +from service.simulation.SimulatorInterface2 import SimulatorInterface2 +from mqtt_util.publish import AwsMQTT + +class RealSensor(SimulatorInterface2): + def __init__(self, idx, zone_id, equip_id, interval, msg_count, conn=None, stop_event=None): + super().__init__(idx, zone_id, equip_id, interval, msg_count, conn=conn) + self._is_publishing = False # 중복 실행 방지 플래그 + self.stop_event = stop_event if stop_event else threading.Event() + + # (1) 센서 고유 ID + self.sensor_id = f"UA10H-REAL-24060999" + + # (2) shadow 토픽 + self.shadow_regist_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update" + self.shadow_desired_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update/desired" + + # (3) **온도·습도용 토픽 미리 생성** + self.topic_name_temp = self._build_topic(self.zone_id, self.equip_id, self.sensor_id, "temp") + self.topic_name_humid = self._build_topic(self.zone_id, self.equip_id, self.sensor_id, "humid") + # 시리얼 포트 설정 + self.serial_port = 'COM3' # Windows COM 포트 + self.baudrate = 9600 # 바우드 + + ######## 오버라이딩은 하되 내용은 필요없는 메서드 ######## + def _generate_data(self): + # RealSensor 에선 이 메서드 대신 _read_and_publish_loop 을 씁니다. + raise NotImplementedError + + def _apply_desired_state(self, _): + # 아직 shadow 제어 명령 처리 필요 없으면 그냥 pass + pass + ################################################# + + def start_publishing(self): + """실 센서용 읽기+퍼블리시 루프를 스레드로 돌립니다.""" + # 이미 시작했으면 다시 만들지 않음 + # if getattr(self, "_started", False): + # print("[RealSensor] already running – second call ignored") + # return None + + if self._is_publishing: + print("[RealSensor] Publishing is already running. Ignoring second call.") + return None + + self._is_publishing = True + + thread = threading.Thread(target=self._read_and_publish_loop, daemon=False) + thread.start() + return thread + + def _read_and_publish_loop(self): + try: + # 1) 시리얼 열고 대기 + ser = serial.Serial(self.serial_port, self.baudrate, timeout=1) + time.sleep(2) + + self.type = "real_sensor" + # 2) shadow ON + self._update_shadow("ON") + self._subscribe_to_shadow_desired() + + # 3) msg_count 번만큼 읽어서 퍼블리시 + for _ in range(self.msg_count): + # stop_event 확인 - 중지 요청 있으면 루프 종료 + if self.stop_event.is_set(): + print(f"[RealSensor] Stopping due to stop_event") + break + + line = ser.readline().decode().strip() + if not line.startswith("STREAM"): + continue + _, tmp, hmd = line.split(",") + temperature = round(float(tmp), 2) + humidity = round(float(hmd), 2) + + # → 여기만 교체 + self.publish_value("temp", temperature) + self.publish_value("humid", humidity) + + # print(f"[{time.strftime('%H:%M:%S')}] temp: {temperature}, humid: {humidity} {threading.current_thread().name}") + time.sleep(self.interval) + + finally: + # ▼ 반드시 호출돼서 핸들을 반납 + try: + ser.close() + except Exception: + pass # 이미 닫혔으면 무시 + self._update_shadow("OFF") + self._is_publishing = False + # self._started = False # 다음 실행을 위해 플래그 해제 \ No newline at end of file diff --git a/service/sensor/__init__.py b/service/sensor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/service/sensor/sensorTest.py b/service/sensor/sensorTest.py index e0ed8ac..09ada39 100644 --- a/service/sensor/sensorTest.py +++ b/service/sensor/sensorTest.py @@ -16,7 +16,7 @@ time.sleep(1) # 연결 대기 flag = 0 -max_cnt = 30 #30번번 # 몇 번 받아올건지 +max_cnt = 15 #30번번 # 몇 번 받아올건지 # query = "INSERT INTO ua10_table (temperature, humidity) VALUES (%s, %s)" publisher = AwsMQTT() @@ -56,6 +56,8 @@ stm, tmp, hmd = line.split(",") payload = json.dumps({ "id": "UA10H-CHS-24060894", + "equip_id": "20250507165750-827", + "zone_id": "20250507165750-827", "type": "온습도", "temperature": float(tmp), "humidity": float(hmd) diff --git a/service/simulatelogic/ContinuousSimulatorMixin.py b/service/simulatelogic/ContinuousSimulatorMixin.py new file mode 100644 index 0000000..7432951 --- /dev/null +++ b/service/simulatelogic/ContinuousSimulatorMixin.py @@ -0,0 +1,40 @@ +# continuous_simulator.py +import random +from scipy.stats import truncnorm + +class ContinuousSimulatorMixin: + """평균 회귀 + 작은 노이즈 + 희박한 이상치 발생 로직을 공유""" + # ── 하위 클래스가 오버라이드할 매개변수 ─────────────────── + SENSOR_TYPE = None # "temp" / "humid" / ... + MU = None # 평균 + SIGMA = None # 표준편차 + LOWER = None # 최소 허용값 + UPPER = None # 최대 허용값 + OUTLIER_P = 0.03 # 기본 이상치 확률 3% + DRIFT_THETA = 0.1 # 평균 회귀 강도 ** 데이터 정상범위 유지하는 중요 데이터터 + SMALL_SIGMA_RATIO = 0.1 # 정상 구간 변동폭 (σ의 10 %) + + # ── 내부 상태 초기화 ───────────────────────────────────── + def _reset_state(self): + a, b = (self.LOWER - self.MU) / self.SIGMA, (self.UPPER - self.MU) / self.SIGMA + first = truncnorm.rvs(a, b, loc=self.MU, scale=self.SIGMA/3) + self.prev_val = round(first, 2) + + # ── 핵심 데이터 생성 로직 ───────────────────────────────── + def _generate_continuous_val(self): + if not hasattr(self, "prev_val"): + self._reset_state() + + # 1) 평균 회귀 + 작은 노이즈 + mean_revert = self.MU + (self.prev_val - self.MU) * (1 - self.DRIFT_THETA) + small_sigma = self.SIGMA * self.SMALL_SIGMA_RATIO + val = random.gauss(mean_revert, small_sigma) + + # 2) 이상치 + if random.random() < self.OUTLIER_P: + val = random.gauss(self.MU, self.SIGMA) + + # 3) 범위 클램프 & 저장 + val = round(max(self.LOWER, min(self.UPPER, val)), 2) + self.prev_val = val + return val diff --git a/service/simulation/CurrentSimulator.py b/service/simulation/CurrentSimulator.py index ff63f5d..d3c63e6 100644 --- a/service/simulation/CurrentSimulator.py +++ b/service/simulation/CurrentSimulator.py @@ -1,8 +1,14 @@ from .SimulatorInterface2 import SimulatorInterface2 -from simulate_type.simulate_list import generate_current_data -from scipy.stats import truncnorm +from service.simulatelogic.ContinuousSimulatorMixin import ContinuousSimulatorMixin + +class CurrentSimulator(ContinuousSimulatorMixin,SimulatorInterface2): + # 타입별 시뮬레이터 세팅 + SENSOR_TYPE = "current" # 센서 타입 + # MU, SIGMA = 62.51, 33.76 + MU, SIGMA = 5, 30 # 평균치 , 표준 편차 + LOWER, UPPER = 1,50 # 측정 범위 + OUTLIER_P = 0.10 # 10 % 확률로 경보 값 생성 -class CurrentSimulator(SimulatorInterface2): def __init__(self, idx: int, zone_id:str, equip_id:str, interval:int = 5, msg_count:int = 10, conn=None): # 시뮬레이터에서 공통적으로 사용하는 속성 super().__init__( @@ -18,14 +24,14 @@ def __init__(self, idx: int, zone_id:str, equip_id:str, interval:int = 5, msg_co self.type = "current" # 센서 타입 self.shadow_regist_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update" self.shadow_desired_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update/desired" - self.topic_name = f"sensor/{zone_id}/{equip_id}/{self.sensor_id}/{self.type}" + self.topic_name = self._build_topic(zone_id, equip_id,self.sensor_id, self.type) self.target_current = None # 초기값 설정(shadow 용) - self.mu = 62.51 - self.sigma = 33.76 - lower = 0 - upper = self.mu + 3 * self.sigma - self.a = (lower - self.mu) / self.sigma - self.b = (upper - self.mu) / self.sigma + # self.mu = 62.51 + # self.sigma = 33.76 + # lower = 0 + # upper = self.mu + 3 * self.sigma + # self.a = (lower - self.mu) / self.sigma + # self.b = (upper - self.mu) / self.sigma # 데이터 생성 로직 정의 @@ -36,7 +42,8 @@ def _generate_data(self) -> dict: "sensorId": self.sensor_id, "sensorType": self.type, # "val": round(random.uniform(0.1 + self.idx, 10.0 + self.idx), 2) - "val": round(truncnorm.rvs(self.a, self.b, loc=self.mu, scale=self.sigma), 2) # 0: 7, 1: 7이상, 2: 30 이상 최소값은 0 + "val": self._generate_continuous_val() + # 0: 7, 1: 7이상, 2: 30 이상 최소값은 0 } ################################################ diff --git a/service/simulation/DustSimulator.py b/service/simulation/DustSimulator.py index 18f9fb8..643eac3 100644 --- a/service/simulation/DustSimulator.py +++ b/service/simulation/DustSimulator.py @@ -1,8 +1,13 @@ from .SimulatorInterface2 import SimulatorInterface2 -import random -from scipy.stats import truncnorm +from service.simulatelogic.ContinuousSimulatorMixin import ContinuousSimulatorMixin + +class DustSimulator(ContinuousSimulatorMixin,SimulatorInterface2): + # dust_simulator.py (Mixin 상수 부분만) + SENSOR_TYPE = "dust" # ㎍/㎥ + MU, SIGMA = 50, 25 # 평균 50 ㎍/㎥, σ = 25 + LOWER, UPPER = 0, 300 # 0 ‒ 300 ㎍/㎥ 범위 + # SMALL_SIGMA_RATIO = 0.10 # 정상 변동폭 = σ의 10 %(≈ ±2.5) -class DustSimulator(SimulatorInterface2): def __init__(self, idx: int, zone_id:str, equip_id:str, interval:int = 5, msg_count:int = 10, conn=None): # 시뮬레이터에서 공통적으로 사용하는 속성 super().__init__( @@ -18,16 +23,16 @@ def __init__(self, idx: int, zone_id:str, equip_id:str, interval:int = 5, msg_co self.type = "dust" # 센서 타입 self.shadow_regist_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update" self.shadow_desired_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update/desired" - self.topic_name = f"sensor/{zone_id}/{equip_id}/{self.sensor_id}/{self.type}" + self.topic_name = self._build_topic(zone_id, equip_id,self.sensor_id, self.type) self.target_current = None # 초기값 설정(shadow 용) - self.mu = 180 # 평균 미세먼지 수치 - self.sigma = 60 # 표준편차 - self.lower = 0 - self.upper = self.mu + 3 * self.sigma + # self.mu = 180 # 평균 미세먼지 수치 + # self.sigma = 60 # 표준편차 + # self.lower = 0 + # self.upper = self.mu + 3 * self.sigma - self.a = (self.lower - self.mu) / self.sigma - self.b = (self.upper - self.mu) / self.sigma + # self.a = (self.lower - self.mu) / self.sigma + # self.b = (self.upper - self.mu) / self.sigma # 데이터 생성 로직 정의 def _generate_data(self) -> dict: @@ -36,7 +41,7 @@ def _generate_data(self) -> dict: "equipId": self.equip_id, "sensorId": self.sensor_id, "sensorType": self.type, - "val": round(truncnorm.rvs(self.a, self.b, loc=self.mu, scale=self.sigma), 2) + "val": self._generate_continuous_val() } ################################################ @@ -46,11 +51,11 @@ def _generate_data(self) -> dict: def _apply_desired_state(self, desired_state): """ Shadow의 desired 상태를 받아서 센서에 적용 - 예) {"target_Vibration": 25.0} 이런 명령을 받아 적용 + 예) {"target_Dust": 25.0} 이런 명령을 받아 적용 """ - target_current = desired_state.get("target_current") - if target_current is not None: - self.target_current = target_current - print(f"Desired state applied: {self.sensor_id} - Target Current: {self.target_current}") + target_dust = desired_state.get("target_dust") + if target_dust is not None: + self.target_dust = target_dust + print(f"Desired state applied: {self.sensor_id} - Target Dust: {self.target_dust}") else: - print(f"No target current provided for {self.sensor_id}.") \ No newline at end of file + print(f"No target dust provided for {self.sensor_id}.") \ No newline at end of file diff --git a/service/simulation/HumiditySimulator.py b/service/simulation/HumiditySimulator.py index 49ad278..ac0b3ad 100644 --- a/service/simulation/HumiditySimulator.py +++ b/service/simulation/HumiditySimulator.py @@ -1,60 +1,67 @@ -from .SimulatorInterface2 import SimulatorInterface2 -import random - -class HumiditySimulator(SimulatorInterface2): - def __init__(self, idx: int, zone_id:str, equip_id:str, interval:int = 5, msg_count:int = 10, conn=None): - ######################################### - # 시뮬레이터에서 공통적으로 사용하는 속성 - ######################################### - super().__init__( - idx=idx, - zone_id=zone_id, - equip_id=equip_id, - interval=interval, - msg_count=msg_count, - conn=conn - ) - - ######################################### - # 시뮬레이터 마다 개별적으로 사용하는 속성(토픽, 수집 데이터 초기값) - ######################################### - self.sensor_id = f"UA10H-HUM-2406089{idx}" # 센서 ID - self.type = "humid" # 센서 타입 - # shadow 등록용 토픽 - self.shadow_regist_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update" - # shadow 제어 명령 구독용 토픽 - self.shadow_desired_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update/desired" - # 센서 데이터 publish용 토픽 - self.topic_name = f"sensor/{zone_id}/{equip_id}/{self.sensor_id}/{self.type}" - self.target_temperature = None # 초기값 설정(shadow 용) - - ################################################z - # 데이터 생성 로직을 정의 (시뮬레이터 마다 다르게 구현) - # 예) 온도, 습도, 진동, 전류 등등 - ################################################ - def _generate_data(self) -> dict: - """ 데이터 생성 메서드 """ - return { - "zoneId": self.zone_id, - "equipId": self.equip_id, - "sensorId": self.sensor_id, - "sensorType": self.type, - "val": round(random.gauss(mu = 11.68, sigma = 29.38), 2) # 0: 60이하,1: 60초과, 2: 80 초과과 - } - - ################################################ - # 제어 로직을 정의 ( shadow의 desired 상태를 구독하여 제어하는 로직을 구현할 예정) - ################################################ - def _apply_desired_state(self, desired_state): - """ - Shadow의 desired 상태를 받아서 센서에 적용 - 예) {"target_humidity": 25.0} 이런 명령을 받아 적용 - """ - target_humidity = desired_state.get("target_humidity") - if target_humidity is not None: - self.target_humidity = target_humidity - print(f"Desired state applied: {self.sensor_id} - Target humidity: {self.target_humidity}") - else: - print(f"No target humidity provided for {self.sensor_id}.") - +from .SimulatorInterface2 import SimulatorInterface2 +import random +from service.simulatelogic.ContinuousSimulatorMixin import ContinuousSimulatorMixin + +class HumiditySimulator(ContinuousSimulatorMixin,SimulatorInterface2): + # 타입별 시뮬레이터 세팅 + SENSOR_TYPE = "humid" + MU, SIGMA = 55, 15 + LOWER, UPPER = 0, 100 + OUTLIER_P = 0.1 + + def __init__(self, idx: int, zone_id:str, equip_id:str, interval:int = 5, msg_count:int = 10, conn=None): + ######################################### + # 시뮬레이터에서 공통적으로 사용하는 속성 + ######################################### + super().__init__( + idx=idx, + zone_id=zone_id, + equip_id=equip_id, + interval=interval, + msg_count=msg_count, + conn=conn + ) + + ######################################### + # 시뮬레이터 마다 개별적으로 사용하는 속성(토픽, 수집 데이터 초기값) + ######################################### + self.sensor_id = f"UA10H-HUM-3406089{idx}" # 센서 ID + self.type = "humid" # 센서 타입 + # shadow 등록용 토픽 + self.shadow_regist_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update" + # shadow 제어 명령 구독용 토픽 + self.shadow_desired_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update/desired" + # 센서 데이터 publish용 토픽 + self.topic_name = self._build_topic(zone_id, equip_id,self.sensor_id, self.type) + self.target_temperature = None # 초기값 설정(shadow 용) + + ################################################z + # 데이터 생성 로직을 정의 (시뮬레이터 마다 다르게 구현) + # 예) 온도, 습도, 진동, 전류 등등 + ################################################ + def _generate_data(self) -> dict: + """ 데이터 생성 메서드 """ + return { + "zoneId": self.zone_id, + "equipId": self.equip_id, + "sensorId": self.sensor_id, + "sensorType": self.type, + "val": self._generate_continuous_val() + } + + ################################################ + # 제어 로직을 정의 ( shadow의 desired 상태를 구독하여 제어하는 로직을 구현할 예정) + ################################################ + def _apply_desired_state(self, desired_state): + """ + Shadow의 desired 상태를 받아서 센서에 적용 + 예) {"target_humidity": 25.0} 이런 명령을 받아 적용 + """ + target_humidity = desired_state.get("target_humidity") + if target_humidity is not None: + self.target_humidity = target_humidity + print(f"Desired state applied: {self.sensor_id} - Target humidity: {self.target_humidity}") + else: + print(f"No target humidity provided for {self.sensor_id}.") + \ No newline at end of file diff --git a/service/simulation/SimulatorInterface2.py b/service/simulation/SimulatorInterface2.py index 9ad7929..4f5f375 100644 --- a/service/simulation/SimulatorInterface2.py +++ b/service/simulation/SimulatorInterface2.py @@ -59,7 +59,7 @@ def _publish_data(self): ) print(f"Published data to {self.topic_name}: {payload}") - def __subscribe_to_shadow_desired(self): + def _subscribe_to_shadow_desired(self): """ Shadow의 desired를 구독 (제어 메세지 수신용) """ # MQTT 연결 객체를 통해 subscribe 호출 self.conn.subscribe( @@ -95,19 +95,26 @@ def _publish_loop(self): try: for _ in range(self.msg_count): - if self.stop_event.is_set(): - break - self._publish_data() - time.sleep(self.interval) + try: + if self.stop_event.is_set(): + break + self._publish_data() + time.sleep(self.interval) + except Exception as e: + print(f"Error in publish loop: {e}") finally: self._update_shadow(status="OFF") ######################################################################################## # 시뮬레이션 객체에서 공통으로 사용할 스레드 관련 메서드 start_publishing, wait_until_done, stop ######################################################################################## def start_publishing(self): + if hasattr(self, '_read_and_publish_loop'): + print("[SimulatorInterface2] Skip publishing because child overrides it.") + return # 자식이 RealSensor라면 무시 + """ 센서 데이터 publish 작업을 스레드에서 시작 """ # Shadow의 desired 상태 구독 - callback으로 __apply_desired_state 메서드 사용 - self.__subscribe_to_shadow_desired() + self._subscribe_to_shadow_desired() # publish 시작 self.thread = threading.Thread(target=self._publish_loop) @@ -134,3 +141,25 @@ def generate_truncated_normal(self, mu: float, sigma: float, lower: float = None a, b = (lower - mu) / sigma, (upper - mu) / sigma value = truncnorm.rvs(a, b, loc=mu, scale=sigma) return round(value, 2) + + def _build_topic(self, zone_id, equip_id, sensor_id, sensor_type): + prefix = "zone" if zone_id == equip_id else "equip" + return f"sensor/{prefix}/{zone_id}/{equip_id}/{sensor_id}/{sensor_type}" + + def publish_value(self, sensor_type: str, value: float): + """주어진 sensor_type, value 를 payload 로 묶어 + prefix(zone/equip) 로 토픽을 만들고 publish.""" + topic = self._build_topic(self.zone_id, self.equip_id, self.sensor_id, sensor_type) + payload = json.dumps({ + "zoneId": self.zone_id, + "equipId": self.equip_id, + "sensorId": self.sensor_id, + "sensorType": sensor_type, + "val": value + }) + self.conn.publish(topic, payload, mqtt.QoS.AT_LEAST_ONCE) + print(f"Published data to {topic}: {payload}, {threading.current_thread().name}") + def stop_publishing(self): + """시뮬레이터 중지""" + self.stop_event.set() + print(f"[{self.__class__.__name__}] Stopping publishing...") \ No newline at end of file diff --git a/service/simulation/TempSimulator.py b/service/simulation/TempSimulator.py index e204eb0..ab869a5 100644 --- a/service/simulation/TempSimulator.py +++ b/service/simulation/TempSimulator.py @@ -1,77 +1,72 @@ -from .SimulatorInterface2 import SimulatorInterface2 -import random -from scipy.stats import truncnorm - - -class TempSimulator(SimulatorInterface2): - def __init__(self, idx: int, zone_id:str, equip_id:str, interval:int = 5, msg_count:int = 10, conn=None): - ######################################### - # 시뮬레이터에서 공통적으로 사용하는 속성 - ######################################### - super().__init__( - idx=idx, - zone_id=zone_id, - equip_id=equip_id, - interval=interval, - msg_count=msg_count, - conn=conn - ) - - ######################################### - # 시뮬레이터 마다 개별적으로 사용하는 속성(토픽, 수집 데이터 초기값) - ######################################### - - self.sensor_id = f"UA10T-TEM-2406089{idx}" # 센서 ID - self.type = "temp" # 센서 타입 - # shadow 등록용 토픽 - self.shadow_regist_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update" - - # shadow 제어 명령 구독용 토픽 - self.shadow_desired_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update/desired" - - # 센서 데이터 publish용 토픽 - self.topic_name = f"sensor/{zone_id}/{equip_id}/{self.sensor_id}/{self.type}" - - self.target_temperature = None # 초기값 설정(shadow 용) - - self.mu = 25 # 평균 온도 (정상 범위: 18~21℃) - self.sigma = 10 # 표준편차 (온도의 변동폭) - - # 절단 범위 설정 (최소값 -35℃, 최대값 50℃로 설정) - self.lower = -35 - self.upper = 50 - - # 정규분포 범위의 a, b 값 계산 - self.a = (self.lower - self.mu) / self.sigma - self.b = (self.upper - self.mu) / self.sigma - - ################################################z - # 데이터 생성 로직을 정의 (시뮬레이터 마다 다르게 구현) - # 예) 온도, 습도, 진동, 전류 등등 - ################################################ - def _generate_data(self) -> dict: - """ 데이터 생성 메서드 """ - return { - "zoneId": self.zone_id, - "equipId": self.equip_id, - "sensorId": self.sensor_id, - "sensorType": self.type, - "val": round(truncnorm.rvs(self.a, self.b, loc=self.mu, scale=self.sigma), 2) - } - - ################################################ - # 제어 로직을 정의 ( shadow의 desired 상태를 구독하여 제어하는 로직을 구현할 예정) - ################################################ - def _apply_desired_state(self, desired_state): - """ - Shadow의 desired 상태를 받아서 센서에 적용 - 예) {"target_humid": 25.0} 이런 명령을 받아 적용 - """ - target_humid = desired_state.get("target_humid") - if target_humid is not None: - self.target_humid = target_humid - print(f"Desired state applied: {self.sensor_id} - Target humid: {self.target_humid}") - else: - print(f"No target humid provided for {self.sensor_id}.") - +from .SimulatorInterface2 import SimulatorInterface2 +import random +from service.simulatelogic.ContinuousSimulatorMixin import ContinuousSimulatorMixin + +class TempSimulator(ContinuousSimulatorMixin, SimulatorInterface2): + # 정규분포 상속로직에 집어넣을 숫자들 + SENSOR_TYPE = "temp" # 센서 타입 + MU, SIGMA = 25, 10 # 평균, 표준편차 + LOWER, UPPER = -35, 50 # 최소, 최대값 + OUTLIER_P = 0.05 # 이상치 확률(기본 5 %) + + def __init__(self, idx: int, zone_id:str, equip_id:str, interval:int = 5, msg_count:int = 10, conn=None): + ######################################### + # 시뮬레이터에서 공통적으로 사용하는 속성 + ######################################### + super().__init__( + idx=idx, + zone_id=zone_id, + equip_id=equip_id, + interval=interval, + msg_count=msg_count, + conn=conn + ) + + ######################################### + # 시뮬레이터 마다 개별적으로 사용하는 속성(토픽, 수집 데이터 초기값) + ######################################### + + self.sensor_id = f"UA10T-TEM-3406089{idx}" # 센서 ID + self.type = "temp" # 센서 타입 + # shadow 등록용 토픽 + self.shadow_regist_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update" + + # shadow 제어 명령 구독용 토픽 + self.shadow_desired_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update/desired" + + # 센서 데이터 publish용 토픽 + # self.topic_name = f"sensor/{zone_id}/{equip_id}/{self.sensor_id}/{self.type}" + self.topic_name = self._build_topic(zone_id, equip_id,self.sensor_id, self.type) + + self.target_temperature = None # 초기값 설정(shadow 용) + + ################################################z + # 데이터 생성 로직을 정의 (시뮬레이터 마다 다르게 구현) + # 예) 온도, 습도, 진동, 전류 등등 + ################################################ + def _generate_data(self) -> dict: + """ 데이터 생성 메서드 """ + return { + "zoneId": self.zone_id, + "equipId": self.equip_id, + "sensorId": self.sensor_id, + "sensorType": self.type, + "val": self._generate_continuous_val() + } + + ################################################ + # 제어 로직을 정의 ( shadow의 desired 상태를 구독하여 제어하는 로직을 구현할 예정) + ################################################ + def _apply_desired_state(self, desired_state): + """ + Shadow의 desired 상태를 받아서 센서에 적용 + 예) {"target_humid": 25.0} 이런 명령을 받아 적용 + """ + target_temperature = desired_state.get("target_Temperature") + if target_temperature is not None: + self.target_temperature = target_temperature + print(f"Desired state applied: {self.sensor_id} - Target Temperature: {self.target_temperature}") + else: + print(f"No target temp provided for {self.sensor_id}.") + \ No newline at end of file diff --git a/service/simulation/VibrationSimulator.py b/service/simulation/VibrationSimulator.py index 06ff015..d8eadf6 100644 --- a/service/simulation/VibrationSimulator.py +++ b/service/simulation/VibrationSimulator.py @@ -1,9 +1,15 @@ from .SimulatorInterface2 import SimulatorInterface2 -from simulate_type.simulate_list import generate_vibration_data -import random -from scipy.stats import truncnorm +from service.simulatelogic.ContinuousSimulatorMixin import ContinuousSimulatorMixin + +class VibrationSimulator(ContinuousSimulatorMixin ,SimulatorInterface2): + # 정규분포 상속로직에 집어넣을 숫자들 + SENSOR_TYPE = "vibration" # 센서 타입 + # MU, SIGMA = 3.5, 2 # 평균, 표준편차 + MU, SIGMA = 2.0, 2.0 # 평균, 표준편차 + LOWER, UPPER = 0, 10 # 최소, 최대값 + OUTLIER_P = 0.06 # 이상치 확률(기본 6 %) + SMALL_SIGMA_RATIO = 0.25 # 정상 구간 변동폭을 σ의 10 %로 -class VibrationSimulator(SimulatorInterface2): def __init__(self, idx: int, zone_id:str, equip_id:str, interval:int = 5, msg_count:int = 10, conn=None): # 시뮬레이터에서 공통적으로 사용하는 속성 super().__init__( @@ -19,18 +25,18 @@ def __init__(self, idx: int, zone_id:str, equip_id:str, interval:int = 5, msg_co self.type = "vibration" # Sensor type self.shadow_regist_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update" self.shadow_desired_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update/desired" - self.topic_name = f"sensor/{zone_id}/{equip_id}/{self.sensor_id}/{self.type}" + self.topic_name = self._build_topic(zone_id, equip_id,self.sensor_id, self.type) self.target_vibration = None # Initial value for shadow) - self.mu = 3.5 # 평균 진동값 - self.sigma = 2 # 표준편차 (진동의 변동폭) + # self.mu = 3.5 # 평균 진동값 + # self.sigma = 2 # 표준편차 (진동의 변동폭) - # 절단 범위 설정 (최소값 0, 최대값 10으로 설정) - self.lower = 0 - self.upper = 10 + # # 절단 범위 설정 (최소값 0, 최대값 10으로 설정) + # self.lower = 0 + # self.upper = 10 - # 정규분포 범위의 a, b 값 계산 - self.a = (self.lower - self.mu) / self.sigma - self.b = (self.upper - self.mu) / self.sigma + # # 정규분포 범위의 a, b 값 계산 + # self.a = (self.lower - self.mu) / self.sigma + # self.b = (self.upper - self.mu) / self.sigma # 데이터 생성 로직을 정의 (시뮬레이터 마다 다르게 구현) def _generate_data(self) -> dict: @@ -39,7 +45,7 @@ def _generate_data(self) -> dict: "equipId": self.equip_id, "sensorId": self.sensor_id, "sensorType": self.type, - "val": round(truncnorm.rvs(self.a, self.b, loc=self.mu, scale=self.sigma), 2) + "val": self._generate_continuous_val() } diff --git a/service/simulation/VocSimulator.py b/service/simulation/VocSimulator.py index ebab0c3..1da0c11 100644 --- a/service/simulation/VocSimulator.py +++ b/service/simulation/VocSimulator.py @@ -1,7 +1,13 @@ from .SimulatorInterface2 import SimulatorInterface2 -import random +from service.simulatelogic.ContinuousSimulatorMixin import ContinuousSimulatorMixin + +class VocSimulator(ContinuousSimulatorMixin, SimulatorInterface2): + # voc_simulator.py – 상단 상수 정의 부분 + SENSOR_TYPE = "voc" + MU, SIGMA = 400, 250 # 중심을 안전 구간에 가깝게, σ를 줄임 + LOWER, UPPER = 0, 2000 # 실제 상한값을 넉넉히 확보 + OUTLIER_P = 0.05 # 5% 확률로 위험 구간을 넘기기 위한 이상치 발생 -class VocSimulator(SimulatorInterface2): def __init__(self, idx: int, zone_id:str, equip_id:str, interval:int = 5, msg_count:int = 10, conn=None): # 시뮬레이터에서 공통적으로 사용하는 속성 super().__init__( @@ -17,7 +23,7 @@ def __init__(self, idx: int, zone_id:str, equip_id:str, interval:int = 5, msg_co self.type = "voc" # 센서 타입 self.shadow_regist_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update" self.shadow_desired_topic_name = f"$aws/things/Sensor/shadow/name/{self.sensor_id}/update/desired" - self.topic_name = f"sensor/{zone_id}/{equip_id}/{self.sensor_id}/{self.type}" + self.topic_name = self._build_topic(zone_id, equip_id,self.sensor_id, self.type) self.target_current = None # 초기값 설정(shadow 용) # 데이터 생성 로직 정의 @@ -27,7 +33,7 @@ def _generate_data(self) -> dict: "equipId": self.equip_id, "sensorId": self.sensor_id, "sensorType": self.type, - "val": round(random.uniform(5.0 + self.idx, 50.0 + self.idx), 2) + "val": self._generate_continuous_val() } ################################################ @@ -37,11 +43,11 @@ def _generate_data(self) -> dict: def _apply_desired_state(self, desired_state): """ Shadow의 desired 상태를 받아서 센서에 적용 - 예) {"target_Vibration": 25.0} 이런 명령을 받아 적용 + 예) {"target_Voc": 25.0} 이런 명령을 받아 적용 """ - target_current = desired_state.get("target_current") - if target_current is not None: - self.target_current = target_current - print(f"Desired state applied: {self.sensor_id} - Target Current: {self.target_current}") + target_voc = desired_state.get("target_voc") + if target_voc is not None: + self.target_voc = target_voc + print(f"Desired state applied: {self.sensor_id} - Target Voc: {self.target_voc}") else: - print(f"No target current provided for {self.sensor_id}.") \ No newline at end of file + print(f"No target voc provided for {self.sensor_id}.") \ No newline at end of file diff --git a/service/simulation/__init__.py b/service/simulation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/service/simulation/simulateTest.py b/service/simulation/simulateTest.py index d9225fa..40635c2 100644 --- a/service/simulation/simulateTest.py +++ b/service/simulation/simulateTest.py @@ -6,6 +6,7 @@ import threading from awscrt import mqtt from mqtt_util.publish import AwsMQTT +from service.sensor.RealSensor import RealSensor # 각 시뮬레이션 인터페이스에서 해당 데이터들을 사용, 메인에선 사용 X # from simulate_type.simulate_list import generate_temp_data, generate_humidity_data, generate_humidity_temp_data, generate_wearable_data, generate_vibration_data, generate_current_data from .factory import get_simulator @@ -15,26 +16,58 @@ # 스레드에서 실행될 시뮬레이션 함수 def run_simulator(simulator, count, interval): for _ in range(count): + print(count) data = simulator.start_publishing() print(json.dumps(data, indent=4)) # 데이터를 JSON 형식으로 출력 time.sleep(interval) -def run_simulator_from_streamlit(simulator_type, count, interval, sensor_num, zone_id, equip_id): +def run_simulator_from_streamlit(simulator_type, count, interval, sensor_num, zone_id, equip_id, stop_event=None): + # real_sensor 면 RealSensor 로직을 쓰고, Thread 를 반환받습니다. + if simulator_type == "real_sensor": + real = RealSensor( + idx=sensor_num, + zone_id=zone_id, + equip_id=equip_id, + interval=interval, + msg_count=count, + conn=conn, + stop_event=stop_event # stop_event 전달 + ) + t = real.start_publishing() + return [t] if t else [] + # threads.append(t) + simulators = get_simulator( - conn=AwsMQTT(), + conn=conn, simulator_type=simulator_type, idx=sensor_num, zone_id=zone_id, equip_id=equip_id, interval=interval, - msg_count=count + msg_count=1 ) + threads = [] for simulator in simulators: - for _ in range(count): - data = simulator.start_publishing() - print(json.dumps(data, indent=4)) # 데이터를 JSON 형식으로 출력 - time.sleep(interval) + # 스레드로 실행하여 비동기 처리 + thread = threading.Thread(target=run_simulator_with_stop, args=( + simulator, count, interval, stop_event + )) + thread.start() + threads.append(thread) + + return threads +# 새로운 함수: stop_event가 설정된 상태에서 시뮬레이터 실행 +def run_simulator_with_stop(simulator, count, interval, stop_event=None): + for i in range(count): + if stop_event and stop_event.is_set(): + print(f"[Simulator] Stopping due to stop_event") + break + + data = simulator.start_publishing() + print(f"[{time.strftime('%H:%M:%S')}] Publishing data: {json.dumps(data)}") + time.sleep(interval) # 실제 interval 간격으로 실행 + # 시뮬레이션 함수 def run_simulation_from_json(json_file_path): # JSON 파일 읽기 @@ -58,6 +91,20 @@ def run_simulation_from_json(json_file_path): print(f"Starting simulation for {simulator_type} with {sensor_num} sensors...") + if simulator_type == "real_sensor": + # RealSensor 인스턴스를 생성해서 .start_publishing() 를 스레드로 띄움 + real = RealSensor( + idx=sensor_num, + zone_id=zone_id, + equip_id=equip_id, + interval=interval, + msg_count=count, + conn=conn + ) + t = real.start_publishing() # 이제 스레드를 직접 반환받음 + threads.append(t) + continue + # 시뮬레이터 생성 simulators = get_simulator( conn = conn, @@ -66,7 +113,7 @@ def run_simulation_from_json(json_file_path): zone_id=zone_id, equip_id=equip_id, interval=interval, - msg_count=count + msg_count=1 ) # 데이터 생성 및 출력 @@ -78,6 +125,11 @@ def run_simulation_from_json(json_file_path): thread = threading.Thread(target=run_simulator, args=(simulator, count, interval)) threads.append(thread) thread.start() + # # 모든 스레드가 종료될 때까지 대기 + # for thread in threads: + # thread.join() + + print("All simulations completed.") if __name__ == "__main__": # JSON 파일 경로 diff --git a/simulation_cconfig.json b/simulation_cconfig.json index 1890c2c..82a397a 100644 --- a/simulation_cconfig.json +++ b/simulation_cconfig.json @@ -1,28 +1,20 @@ -{ - "devices": [ - { - "count": 1, - "interval": 10.0, - "equip_id": "SBID-124", - "zone_id": "PID-711", - "simulator": "temp", - "sensor_num": 2 - }, - { - "count": 1, - "interval": 10.0, - "equip_id": "SBID-125", - "zone_id": "PID-790", - "simulator": "humidity", - "sensor_num": 3 - }, - { - "count": 1, - "interval": 10.0, - "equip_id": "SBID-126", - "zone_id": "PID-791", - "simulator": "vibration", - "sensor_num": 4 - } - ] +{ + "devices": [ + { + "count": 5, + "interval": 5, + "equip_id": "20250507171316-389", + "zone_id": "20250507165750-827", + "simulator": "real_sensor", + "sensor_num": 1 + }, + { + "count": 5, + "interval": 5, + "equip_id": "20250507171316-389", + "zone_id": "20250507165750-827", + "simulator": "current", + "sensor_num": 1 + } + ] } \ No newline at end of file diff --git a/streamlit_app/app.py b/streamlit_app/app.py index 6c3b2d4..590e003 100644 --- a/streamlit_app/app.py +++ b/streamlit_app/app.py @@ -77,13 +77,36 @@ def load_from_db(): # Function to run simulation with stop functionality def run_simulation_with_stop(simulator_type, count, interval, sensor_num, zone_id, equip_id, stop_event): - for _ in range(count): - if stop_event.is_set(): # Stop 이벤트가 설정되었는지 확인 - print(f"Stopping simulation for {simulator_type}") - break - run_simulator_from_streamlit(simulator_type, count, interval, sensor_num, zone_id, equip_id) - time.sleep(interval) # 시뮬레이션 간격 - + # RealSensor와 가상 시뮬레이터를 구분하여 처리 + if simulator_type == "real_sensor": + # RealSensor 모드: 쓰레드를 실행하고 포트 해제를 위해 join() 사용 + threads = run_simulator_from_streamlit( + simulator_type, count, interval, + sensor_num, zone_id, equip_id, + stop_event # stop_event 전달 중요! + ) + + # join() 호출 제거 - 쓰레드가 백그라운드에서 실행되도록 함 + print(f"RealSensor threads started in background. Count: {len(threads or [])}") + + # 선택사항: 쓰레드 정리를 위한 참조 저장 + # 현재의 코드에서는 이미 simulation_threads에 참조가 저장되므로 추가 작업 필요 없음 + else: + # 가상 시뮬레이터 모드: 기존에 잘 작동하던 방식 사용 + for _ in range(count): + if stop_event.is_set(): # Stop 이벤트가 설정되었는지 확인 + print(f"Stopping simulation for {simulator_type}") + break + + # 시뮬레이터 한 번 실행 + run_simulator_from_streamlit( + simulator_type, 1, interval, # count=1로 한 번만 실행 + sensor_num, zone_id, equip_id, + stop_event + ) + + time.sleep(interval) # 시뮬레이션 간격 + # Streamlit app def main(): st.title("Simulation Configuration Manager") @@ -151,6 +174,8 @@ def main(): else: st.write("No devices found. Please load data or add a new device.") + + # Add new device st.header("Add New Device") if st.button("Add Device"): @@ -173,6 +198,7 @@ def main(): save_to_db(st.session_state.data) st.success("Saved data to SQLite.") + if __name__ == "__main__": init_db() main() \ No newline at end of file