From 38d216b32255524da920958a5df45ed6f2158925 Mon Sep 17 00:00:00 2001 From: senthurayyappan Date: Wed, 19 Feb 2025 17:49:23 +0000 Subject: [PATCH] fix: use soft realtime clock instead of while loops --- opensourceleg/logging/logger.py | 356 ++++++++++++++----- opensourceleg/time/time.py | 2 +- tutorials/actuators/dephy/sensor_reading.py | 38 +- tutorials/actuators/dephy/voltage_control.py | 99 +----- 4 files changed, 295 insertions(+), 200 deletions(-) diff --git a/opensourceleg/logging/logger.py b/opensourceleg/logging/logger.py index 0515f2a..831a6fa 100644 --- a/opensourceleg/logging/logger.py +++ b/opensourceleg/logging/logger.py @@ -27,15 +27,33 @@ import csv import logging import os -import time from collections import deque from datetime import datetime from enum import Enum from logging.handlers import RotatingFileHandler from typing import Any, Callable, Optional, Union +__all__ = ["LOGGER", "LOG_LEVEL", "Logger"] + + +class LogLevel(Enum): + """ + Enumerates the possible log levels. + + Attributes: + DEBUG (int): Debug log level + INFO (int): Info log level + WARNING (int): Warning log level + ERROR (int): Error log level + CRITICAL (int): Critical log level + + Examples: + >>> LogLevel.DEBUG + 10 + >>> LogLevel.INFO + 20 + """ -class LogLevel(int, Enum): DEBUG = logging.DEBUG INFO = logging.INFO WARNING = logging.WARNING @@ -44,11 +62,62 @@ class LogLevel(int, Enum): class Logger(logging.Logger): + """ + Represents a custom singleton logger class that extends the built-in Python logger. The logger provides additional + functionality for tracking and logging variables to a CSV file. It supports different log levels and log formatting + options. + + Args: + log_path (str): The path to save log files. + log_format (str): The log message format. + file_level (LogLevel): The log level for file output. + stream_level (LogLevel): The log level for console output. + file_max_bytes (int): The maximum size of the log file in bytes before rotation. + file_backup_count (int): The number of backup log files to keep. + file_name (Union[str, None]): The base name for the log file. + buffer_size (int): The maximum number of log entries to buffer before writing to the CSV file. + + Properties: + - **file_path**: The path to the log file. + - **buffer_size**: The maximum number of log entries to buffer. + - **file_level**: The log level for file output. + - **stream_level**: The log level for console output. + - **file_max_bytes**: The maximum size of the log file in bytes before rotation. + - **file_backup_count**: The number of backup log files to keep. + + Methods: + - **track_variable**: Track a variable for logging. + - **untrack_variable**: Stop tracking a variable. + - **flush_buffer**: Write the buffered log entries to the CSV file. + - **reset**: Reset the logger state. + - **close**: Close the logger and flush any remaining log entries. + - **debug**: Log a debug message. + - **info**: Log an info message. + - **warning**: Log a warning message. + - **error**: Log an error message. + - **critical**: Log a critical message. + - **log**: Log a message at a specific log level. + + Examples: + >>> logger = Logger() + >>> logger.info("This is an info message") + [2022-01-01 12:00:00] INFO: This is an info message + >>> logger.debug("This is a debug message") + [2022-01-01 12:00:00] DEBUG: This is a debug message + + >>> logger.track_variable(lambda: 42, "answer") + >>> logger.update() + >>> logger.flush_buffer() + + """ + _instance = None - def __new__(cls, *args: Any, **kwargs: Any) -> "Logger": + def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) + else: + print(f"Reusing existing Logger instance: {id(cls._instance)}") return cls._instance def __init__( @@ -62,6 +131,19 @@ def __init__( file_name: Union[str, None] = None, buffer_size: int = 1000, ) -> None: + """ + Initialize the custom logger with the specified configuration. + + Args: + log_path: The path to save log files. + log_format: The log message format. + file_level: The log level for file output. + stream_level: The log level for console output. + file_max_bytes: The maximum size of the log file in bytes before rotation. + file_backup_count: The number of backup log files to keep. + file_name: The base name for the log file. + buffer_size: The maximum number of log entries to buffer before writing to the CSV file. + """ if not hasattr(self, "_initialized"): super().__init__(__name__) self._log_path = log_path @@ -72,10 +154,10 @@ def __init__( self._file_backup_count = file_backup_count self._user_file_name = file_name - self._file_path: Optional[str] = None - self._csv_path: Optional[str] = None + self._file_path: str = "" + self._csv_path: str = "" self._file: Optional[Any] = None - self._writer: Any = None + self._writer = None self._is_logging = False self._header_written = False @@ -87,7 +169,6 @@ def __init__( self._setup_logging() self._initialized: bool = True else: - self._log_path = log_path self.set_file_name(file_name) self.set_file_level(file_level) self.set_stream_level(stream_level) @@ -96,39 +177,76 @@ def __init__( self._file_backup_count = file_backup_count self.set_buffer_size(buffer_size) + self._log_path = log_path + def _setup_logging(self) -> None: - self.setLevel(level=self._file_level) - self._std_formatter = logging.Formatter(self._log_format) + if not hasattr(self, "_stream_handler"): # Prevent duplicate handlers + self.setLevel(level=self._file_level.value) + self._std_formatter = logging.Formatter(self._log_format) - self._stream_handler = logging.StreamHandler() - self._stream_handler.setLevel(level=self._stream_level) - self._stream_handler.setFormatter(fmt=self._std_formatter) - self.addHandler(hdlr=self._stream_handler) + self._stream_handler = logging.StreamHandler() + self._stream_handler.setLevel(level=self._stream_level.value) + self._stream_handler.setFormatter(fmt=self._std_formatter) + self.addHandler(hdlr=self._stream_handler) def _setup_file_handler(self) -> None: - if not self._file_path: + if not hasattr(self, "_file_handler"): # Ensure file handler is added only once self._generate_file_paths() - self._file_handler = RotatingFileHandler( - filename=self._file_path if self._file_path else "", - mode="w", - maxBytes=self._file_max_bytes, - backupCount=self._file_backup_count, - ) - self._file_handler.setLevel(level=self._file_level) - self._file_handler.setFormatter(fmt=self._std_formatter) - self.addHandler(hdlr=self._file_handler) - - def _ensure_file_handler(self) -> None: + self._file_handler = RotatingFileHandler( + filename=self._file_path, + mode="w", + maxBytes=self._file_max_bytes, + backupCount=self._file_backup_count, + encoding="utf-8", + ) + self._file_handler.setLevel(level=self._file_level.value) + self._file_handler.setFormatter(fmt=self._std_formatter) + self.addHandler(hdlr=self._file_handler) + + def _ensure_file_handler(self): if not hasattr(self, "_file_handler"): self._setup_file_handler() def track_variable(self, var_func: Callable[[], Any], name: str) -> None: + """ + Record the value of a variable and log it to a CSV file. + + Args: + var_func: A function that returns the value of the variable. + name: The name of the variable. + + Examples: + >>> class MyClass: + ... def __init__(self): + ... self.value = 42 + >>> obj = MyClass() + >>> LOGGER.track_variable(lambda: obj.value, "answer") + >>> LOGGER.update() + >>> LOGGER.flush_buffer() + """ + var_id = id(var_func) self._tracked_vars[var_id] = var_func self._var_names[var_id] = name def untrack_variable(self, var_func: Callable[[], Any]) -> None: + """ + Stop tracking a variable and remove it from the logger buffer. + + Args: + var_func: The function used to track the variable. + + Examples: + >>> class MyClass: + ... def __init__(self): + ... self.value = 42 + >>> obj = MyClass() + >>> LOGGER.track_variable(lambda: obj.value, "answer") + >>> LOGGER.update() + >>> LOGGER.flush_buffer() + >>> LOGGER.untrack_variable(lambda: obj.value) + """ var_id = id(var_func) self._tracked_vars.pop(var_id, None) self._var_names.pop(var_id, None) @@ -136,24 +254,71 @@ def untrack_variable(self, var_func: Callable[[], Any]) -> None: def __repr__(self) -> str: return f"Logger(file_path={self._file_path})" - def set_log_path(self, log_path: str) -> None: - self._log_path = log_path - self._generate_file_paths() - def set_file_name(self, file_name: Union[str, None]) -> None: + """ + Set the base name for the log file. + + Args: + file_name: The base name for the log file. + + Examples: + >>> LOGGER.set_file_name("my_log_file") + >>> LOGGER.file_path + "./my_log_file.log" + """ + # if filename has an extension, remove it + if file_name is not None and "." in file_name: + file_name = file_name.split(".")[0] + self._user_file_name = file_name - self._generate_file_paths() + self._file_path = "" + self._csv_path = "" def set_file_level(self, level: LogLevel) -> None: + """ + Set the log level for file output. + + Args: + level: The log level for file output. + + Examples: + >>> LOGGER.set_file_level(LogLevel.INFO) + >>> LOGGER.file_level + LogLevel.INFO + >>> LOGGER.debug("This is a debug message and will not be logged") + """ self._file_level = level if hasattr(self, "_file_handler"): - self._file_handler.setLevel(level=level) + self._file_handler.setLevel(level=level.value) def set_stream_level(self, level: LogLevel) -> None: + """ + Set the log level for console output. + + Args: + level: The log level for console output. + + Examples: + >>> LOGGER.set_stream_level(LogLevel.INFO) + >>> LOGGER.stream_level + LogLevel.INFO + >>> LOGGER.debug("This is a debug message and will not be streamed") + """ self._stream_level = level - self._stream_handler.setLevel(level=level) + self._stream_handler.setLevel(level=level.value) def set_format(self, log_format: str) -> None: + """ + Set the log message format. The format string uses the same syntax as the built-in Python logging module. + + Args: + log_format: The log message format. + + Examples: + >>> LOGGER.set_format("[%(asctime)s] %(levelname)s: %(message)s") + >>> LOGGER.info("This is an info message") + [2022-01-01 12:00:00] INFO: This is an info message + """ self._log_format = log_format self._std_formatter = logging.Formatter(log_format) if hasattr(self, "_file_handler"): @@ -161,10 +326,27 @@ def set_format(self, log_format: str) -> None: self._stream_handler.setFormatter(fmt=self._std_formatter) def set_buffer_size(self, buffer_size: int) -> None: + """ + Set the maximum number of log entries to buffer before writing to the CSV file. + + Args: + buffer_size: The maximum number of log entries to buffer. + """ self._buffer_size = buffer_size self._buffer = deque(self._buffer, maxlen=buffer_size) def update(self) -> None: + """ + Update the logger by logging the current values of tracked variables to the buffer. + + Examples: + >>> class MyClass: + ... def __init__(self): + ... self.value = 42 + >>> obj = MyClass() + >>> LOGGER.track_variable(lambda: obj.value, "answer") + >>> LOGGER.update() + """ if not self._tracked_vars: return @@ -179,17 +361,16 @@ def update(self) -> None: self.flush_buffer() def flush_buffer(self) -> None: + """ + Write the buffered log entries to the CSV file. + """ if not self._buffer: return self._ensure_file_handler() if self._file is None: - self._file = open( - self._csv_path if self._csv_path else "", - mode="w", - newline="", - ) + self._file = open(self._csv_path, "w", newline="") self._writer = csv.writer(self._file) if not self._header_written: @@ -202,7 +383,7 @@ def flush_buffer(self) -> None: def _write_header(self) -> None: header = list(self._var_names.values()) - self._writer.writerow(header) + self._writer.writerow(header) # type: ignore[assignment] self._header_written = True def _generate_file_paths(self) -> None: @@ -212,130 +393,119 @@ def _generate_file_paths(self) -> None: base_name = self._user_file_name if self._user_file_name else f"{script_name}_{timestamp}" - if not os.path.exists(self._log_path): - os.makedirs(self._log_path) - file_path = os.path.join(self._log_path, base_name) - self._file_path = file_path + ".log" self._csv_path = file_path + ".csv" def __enter__(self) -> "Logger": return self - def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.close() def reset(self) -> None: - self.close() - self._setup_logging() - + """ + Reset the logger state. + """ + self._buffer.clear() self._tracked_vars.clear() self._var_names.clear() self._header_written = False - if hasattr(self, "_file_handler"): self._file_handler.close() del self._file_handler - # re-initialize the logger - def close(self) -> None: - self.flush_buffer() + """ + Close the logger and flush any remaining log entries. + Examples: + >>> LOGGER.close() + >>> LOGGER.info("This message will not be logged") + """ + self.flush_buffer() if self._file: self._file.close() self._file = None self._writer = None - def debug(self, msg: object, *args: object, **kwargs: Any) -> None: + def debug(self, msg, *args, **kwargs): self._ensure_file_handler() super().debug(msg, *args, **kwargs) - def info(self, msg: object, *args: object, **kwargs: Any) -> None: + def info(self, msg, *args, **kwargs): self._ensure_file_handler() super().info(msg, *args, **kwargs) - def warning(self, msg: object, *args: object, **kwargs: Any) -> None: + def warning(self, msg, *args, **kwargs): self._ensure_file_handler() super().warning(msg, *args, **kwargs) - def error(self, msg: object, *args: object, **kwargs: Any) -> None: + def error(self, msg, *args, **kwargs): self._ensure_file_handler() super().error(msg, *args, **kwargs) - def critical(self, msg: object, *args: object, **kwargs: Any) -> None: + def critical(self, msg, *args, **kwargs): self._ensure_file_handler() super().critical(msg, *args, **kwargs) - def log(self, level: int, msg: object, *args: object, **kwargs: Any) -> None: + def log(self, level, msg, *args, **kwargs): self._ensure_file_handler() super().log(level, msg, *args, **kwargs) @property - def file_path(self) -> Optional[str]: + def file_path(self) -> str: + """ + Get the path to the log file. + """ + if self._file_path == "": + self._generate_file_paths() return self._file_path - @property - def csv_path(self) -> Optional[str]: - return self._csv_path - - @property - def log_path(self) -> str: - return self._log_path - @property def buffer_size(self) -> int: + """ + Get the maximum number of log entries to buffer before writing to the CSV file. + """ return self._buffer_size @property def file_level(self) -> LogLevel: + """ + Get the log level for file output (.log). + """ return self._file_level @property def stream_level(self) -> LogLevel: + """ + Get the log level for console output. + """ return self._stream_level @property def file_max_bytes(self) -> int: + """ + Get the maximum size of the log file in bytes before rotation. + """ return self._file_max_bytes @property def file_backup_count(self) -> int: + """ + Get the number of backup log files to keep. + """ return self._file_backup_count # Initialize a global logger instance to be used throughout the library -SCRIPT_DIR = os.path.dirname(__file__) LOGGER = Logger() +LOG_LEVEL = dict(enumerate(LogLevel.__members__.values())) if __name__ == "__main__": + LOGGER.info("This is an info message") - class Test: - def __init__(self) -> None: - self.a: float = 0.0 - - def update(self) -> None: - self.a += 0.2 - - my_logger = Logger(buffer_size=1, file_name="test_logger", log_path="./logs") - x = 0.0 - y = 0.0 - - test = Test() - - my_logger.track_variable(lambda: x, "x") - my_logger.track_variable(lambda: y, "y") - my_logger.track_variable(lambda: test.a, "A") - my_logger.info("Starting logging...") - - for _i in range(1000): - x += 0.1 - y = x**2 - - test.update() - my_logger.update() - - time.sleep(1 / 500) + LOGGER.set_stream_level(LogLevel.CRITICAL) - my_logger.close() + LOGGER.info("This is an info message and won't be displayed") + LOGGER.critical("This is a critical message and will be displayed") diff --git a/opensourceleg/time/time.py b/opensourceleg/time/time.py index 6db4a7c..f0606f2 100644 --- a/opensourceleg/time/time.py +++ b/opensourceleg/time/time.py @@ -100,7 +100,7 @@ class SoftRealtimeLoop: """ - def __init__(self, dt: float = 0.001, report: bool = False, fade: float = 0.0): + def __init__(self, dt: float = 0.001, report: bool = True, fade: float = 0.0): self.t0: float = time.monotonic() self.t1: float = self.t0 self.killer: LoopKiller = LoopKiller(fade_time=fade) diff --git a/tutorials/actuators/dephy/sensor_reading.py b/tutorials/actuators/dephy/sensor_reading.py index 08d2bd4..137bc0f 100644 --- a/tutorials/actuators/dephy/sensor_reading.py +++ b/tutorials/actuators/dephy/sensor_reading.py @@ -1,29 +1,21 @@ -import time - import opensourceleg.actuators.dephy as Dephy from opensourceleg.logging.logger import LOGGER +from opensourceleg.time import SoftRealtimeLoop + +FREQUENCY = 1000 -actpack = Dephy.DephyActuator( - port="/dev/ttyACM0", - gear_ratio=1.0, -) -with actpack: - try: - print("Case:", actpack.case_temperature) - print("Winding:", actpack.winding_temperature) - while True: - print(actpack._data) +if __name__ == "__main__": + clock = SoftRealtimeLoop(dt=1 / FREQUENCY) + actpack = Dephy.DephyActuator( + port="/dev/ttyACM0", + gear_ratio=1.0, + ) + + with actpack: + for t in clock: actpack.update() LOGGER.info( - "".join( - f"Motor Position: {actpack.motor_position}\t" - + f"Motor Voltage: {actpack.motor_voltage}\t" - + f"Motor Current: {actpack.motor_current}\t" - + f"Case Temperature: {actpack.case_temperature}" - + f"Winding Temperature: {actpack.winding_temperature}" - ) + f"Motor Position: {actpack.motor_position}; " + + f"Motor Voltage: {actpack.motor_voltage}; " + + f"Motor Current: {actpack.motor_current}; " ) - time.sleep(0.005) - - except Exception: - exit() diff --git a/tutorials/actuators/dephy/voltage_control.py b/tutorials/actuators/dephy/voltage_control.py index 8bef9fe..423284a 100644 --- a/tutorials/actuators/dephy/voltage_control.py +++ b/tutorials/actuators/dephy/voltage_control.py @@ -1,95 +1,28 @@ -# import time -# import sys -# from opensourceleg.actuators.base import CONTROL_MODES -# import opensourceleg.actuators.dephy as Dephy -# from opensourceleg.logging.logger import LOGGER - -# actpack = Dephy.DephyActuator( -# port="/dev/ttyACM0", -# gear_ratio=1.0, -# ) - -# with actpack: - -# actpack.set_control_mode(mode=CONTROL_MODES.VOLTAGE) - -# while True: -# actpack.set_motor_voltage(value=3000) # in mV -# actpack.update() -# LOGGER.info( -# "".join( -# f"Motor Position: {actpack.motor_position}\t" -# + f"Case Temperature: {actpack.case_temperature}" -# + f"Winding Temperature: {actpack.winding_temperature}" -# ) -# ) -# time.sleep(0.005) - - -import time - -import pandas as pd - import opensourceleg.actuators.dephy as Dephy from opensourceleg.actuators.base import CONTROL_MODES from opensourceleg.logging.logger import LOGGER from opensourceleg.time import SoftRealtimeLoop +FREQUENCY = 1000 TIME_TO_STEP = 1.0 -FREQUENCY = 200 -DT = 1 / FREQUENCY - -def main(): +if __name__ == "__main__": + clock = SoftRealtimeLoop(dt=1 / FREQUENCY) actpack = Dephy.DephyActuator( port="/dev/ttyACM0", gear_ratio=1.0, ) - voltage_data = pd.DataFrame({ - "Time": [], - "Output_Voltage": [], - "Command_Voltage": [], - }) - clock = SoftRealtimeLoop(dt=DT) - with actpack: - try: - actpack.set_control_mode(mode=CONTROL_MODES.VOLTAGE) - - for t in clock: - if t > TIME_TO_STEP: - command_voltage = 3000 - actpack.set_motor_voltage(value=command_voltage) # in mV - - else: - command_voltage = 0 - - actpack.update() - - LOGGER.info( - "".join( - f"Motor Position: {actpack.motor_position}\t" - + f"Motor Voltage: {actpack.motor_voltage}\t" - + f"Motor Current: {actpack.motor_current}\t" - ) - ) - voltage_data = pd.concat( - [ - voltage_data, - pd.DataFrame({ - "Time": [t], - "Output_Voltage": [actpack.motor_voltage], - "Command_Voltage": [command_voltage], - }), - ], - ignore_index=True, - ) - time.sleep(DT) - - finally: - voltage_data.to_csv("voltage_data_dephy.csv", index=False) - exit() - - -if __name__ == "__main__": - main() + with actpack: + actpack.set_control_mode(mode=CONTROL_MODES.VOLTAGE) + + for t in clock: + actpack.update() + + if t > TIME_TO_STEP: + actpack.set_motor_voltage(value=1000) + LOGGER.info( + f"Motor Position: {actpack.motor_position}; " + + f"Motor Voltage: {actpack.motor_voltage}; " + + f"Motor Current: {actpack.motor_current}; " + )