diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..6efd417 Binary files /dev/null and b/.DS_Store differ diff --git a/assets/.DS_Store b/assets/.DS_Store new file mode 100644 index 0000000..50c27e7 Binary files /dev/null and b/assets/.DS_Store differ diff --git a/docs/modules.md b/docs/modules.md deleted file mode 100644 index 673c84e..0000000 --- a/docs/modules.md +++ /dev/null @@ -1 +0,0 @@ -::: opensourceleg.actuators diff --git a/docs/modules/actuators/actuators.md b/docs/modules/actuators/actuators.md new file mode 100644 index 0000000..81e8a46 --- /dev/null +++ b/docs/modules/actuators/actuators.md @@ -0,0 +1 @@ +::: opensourceleg.actuators.base diff --git a/docs/modules/actuators/dephy.md b/docs/modules/actuators/dephy.md new file mode 100644 index 0000000..d281c65 --- /dev/null +++ b/docs/modules/actuators/dephy.md @@ -0,0 +1 @@ +::: opensourceleg.actuators.dephy diff --git a/docs/modules/actuators/moteus.md b/docs/modules/actuators/moteus.md new file mode 100644 index 0000000..ef2dacd --- /dev/null +++ b/docs/modules/actuators/moteus.md @@ -0,0 +1 @@ +::: opensourceleg.actuators.moteus diff --git a/docs/modules/actuators/tmotor.md b/docs/modules/actuators/tmotor.md new file mode 100644 index 0000000..9078da3 --- /dev/null +++ b/docs/modules/actuators/tmotor.md @@ -0,0 +1 @@ +::: opensourceleg.actuators.tmotor diff --git a/docs/modules/collections/validators.md b/docs/modules/collections/validators.md new file mode 100644 index 0000000..7dbcddc --- /dev/null +++ b/docs/modules/collections/validators.md @@ -0,0 +1 @@ +::: opensourceleg.collections.validators diff --git a/docs/modules/control/compiled_controller.md b/docs/modules/control/compiled_controller.md new file mode 100644 index 0000000..23fe334 --- /dev/null +++ b/docs/modules/control/compiled_controller.md @@ -0,0 +1 @@ +::: opensourceleg.control.compiled_controller diff --git a/docs/modules/control/state_machine.md b/docs/modules/control/state_machine.md new file mode 100644 index 0000000..5e53769 --- /dev/null +++ b/docs/modules/control/state_machine.md @@ -0,0 +1 @@ +::: opensourceleg.control.state_machine diff --git a/docs/modules/logging.md b/docs/modules/logging.md new file mode 100644 index 0000000..b0c52dc --- /dev/null +++ b/docs/modules/logging.md @@ -0,0 +1 @@ +::: opensourceleg.logging diff --git a/docs/modules/robots/osl.md b/docs/modules/robots/osl.md new file mode 100644 index 0000000..3a8fc80 --- /dev/null +++ b/docs/modules/robots/osl.md @@ -0,0 +1 @@ +::: opensourceleg.robots.osl diff --git a/docs/modules/robots/robots.md b/docs/modules/robots/robots.md new file mode 100644 index 0000000..21997a6 --- /dev/null +++ b/docs/modules/robots/robots.md @@ -0,0 +1 @@ +::: opensourceleg.robots.base diff --git a/docs/modules/safety.md b/docs/modules/safety.md new file mode 100644 index 0000000..0a5a088 --- /dev/null +++ b/docs/modules/safety.md @@ -0,0 +1 @@ +::: opensourceleg.safety diff --git a/docs/modules/sensors/adc.md b/docs/modules/sensors/adc.md new file mode 100644 index 0000000..0961374 --- /dev/null +++ b/docs/modules/sensors/adc.md @@ -0,0 +1 @@ +::: opensourceleg.sensors.adc diff --git a/docs/modules/sensors/encoder.md b/docs/modules/sensors/encoder.md new file mode 100644 index 0000000..6991e13 --- /dev/null +++ b/docs/modules/sensors/encoder.md @@ -0,0 +1 @@ +::: opensourceleg.sensors.encoder diff --git a/docs/modules/sensors/imu.md b/docs/modules/sensors/imu.md new file mode 100644 index 0000000..ccb5941 --- /dev/null +++ b/docs/modules/sensors/imu.md @@ -0,0 +1 @@ +::: opensourceleg.sensors.imu diff --git a/docs/modules/sensors/loadcell.md b/docs/modules/sensors/loadcell.md new file mode 100644 index 0000000..c8948f2 --- /dev/null +++ b/docs/modules/sensors/loadcell.md @@ -0,0 +1 @@ +::: opensourceleg.sensors.loadcell diff --git a/docs/modules/sensors/sensors_base.md b/docs/modules/sensors/sensors_base.md new file mode 100644 index 0000000..bcf9898 --- /dev/null +++ b/docs/modules/sensors/sensors_base.md @@ -0,0 +1 @@ +::: opensourceleg.sensors.base diff --git a/docs/modules/time.md b/docs/modules/time.md new file mode 100644 index 0000000..7043344 --- /dev/null +++ b/docs/modules/time.md @@ -0,0 +1 @@ +::: opensourceleg.time diff --git a/mkdocs.yml b/mkdocs.yml index cbb96a6..a99e50f 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -9,7 +9,30 @@ copyright: Maintained by Senthur Ay nav: - Overview: index.md - - Modules: modules.md + - Modules: + - Actuators: + - Base: modules/actuators/actuators.md + - Dephy: modules/actuators/dephy.md + - Moteus: modules/actuators/moteus.md + - TMotor: modules/actuators/tmotor.md + - Collections: + - Validators: modules/collections/validators.md + - Control: + - Compiled Controller: modules/control/compiled_controller.md + - State Machine: modules/control/state_machine.md + - Logging: + - Logger: modules/logging.md + - Sensors: + - Base: modules/sensors/sensors_base.md + - IMU: modules/sensors/imu.md + - Loadcell: modules/sensors/loadcell.md + - ADC: modules/sensors/adc.md + - Encoder: modules/sensors/encoder.md + - Robots: + - Base: modules/robots/robots.md + - OSL: modules/robots/osl.md + - Safety: modules/safety.md + - Time: modules/time.md plugins: - search - mkdocstrings: @@ -22,6 +45,13 @@ theme: name: material feature: tabs: true + highlightjs: true + hljs_languages: + - python + - yaml + - bash + - json + - markdown palette: - media: "(prefers-color-scheme: light)" scheme: default @@ -48,7 +78,16 @@ extra: link: https://pypi.org/project/opensourceleg markdown_extensions: + - attr_list + - pymdownx.snippets + - pymdownx.highlight # For syntax highlighting + - pymdownx.superfences + - toc: permalink: true + - pymdownx.arithmatex: generic: true + - def_list + - pymdownx.tasklist: + custom_checkbox: true diff --git a/opensourceleg/actuators/base.py b/opensourceleg/actuators/base.py index 5efd2e6..f310442 100644 --- a/opensourceleg/actuators/base.py +++ b/opensourceleg/actuators/base.py @@ -23,6 +23,22 @@ @dataclass class MOTOR_CONSTANTS: + """ + Class to define the motor constants. + + Examples: + >>> constants = MOTOR_CONSTANTS( + ... MOTOR_COUNT_PER_REV=2048, + ... NM_PER_AMP=0.02, + ... NM_PER_RAD_TO_K=0.001, + ... NM_S_PER_RAD_TO_B=0.0001, + ... MAX_CASE_TEMPERATURE=80.0, + ... MAX_WINDING_TEMPERATURE=120.0 + ... ) + >>> print(constants.MOTOR_COUNT_PER_REV) + 2048 + """ + MOTOR_COUNT_PER_REV: float NM_PER_AMP: float NM_PER_RAD_TO_K: float @@ -31,19 +47,63 @@ class MOTOR_CONSTANTS: MAX_WINDING_TEMPERATURE: float def __post_init__(self) -> None: + """ + Function to validate the motor constants. + + Examples: + >>> # This will raise a ValueError because a negative value is invalid. + >>> MOTOR_CONSTANTS( + ... MOTOR_COUNT_PER_REV=-2048, + ... NM_PER_AMP=0.02, + ... NM_PER_RAD_TO_K=0.001, + ... NM_S_PER_RAD_TO_B=0.0001, + ... MAX_CASE_TEMPERATURE=80.0, + ... MAX_WINDING_TEMPERATURE=120.0 + ... ) + """ if any(x <= 0 for x in self.__dict__.values()): raise ValueError("All values in MOTOR_CONSTANTS must be non-zero and positive.") @property def RAD_PER_COUNT(self) -> float: + """ + Calculate the radians per count. + + Returns: + float: Radians per count. + + Examples: + >>> constants = MOTOR_CONSTANTS(2048, 0.02, 0.001, 0.0001, 80.0, 120.0) + >>> constants.RAD_PER_COUNT + 0.0030679615757712823 + """ return 2 * np.pi / self.MOTOR_COUNT_PER_REV @property def NM_PER_MILLIAMP(self) -> float: + """ + Convert NM per amp to NM per milliamp. + + Returns: + float: NM per milliamp. + + Examples: + >>> constants = MOTOR_CONSTANTS(2048, 0.02, 0.001, 0.0001, 80.0, 120.0) + >>> constants.NM_PER_MILLIAMP + 2e-05 + """ return self.NM_PER_AMP / 1000 class CONTROL_MODES(Enum): + """ + Enum to define various control modes. + + Examples: + >>> CONTROL_MODES.POSITION + + """ + IDLE = -1 POSITION = 0 VOLTAGE = 1 @@ -56,6 +116,15 @@ class CONTROL_MODES(Enum): # TODO: This can be ordered and requires validation @dataclass class ControlGains: + """ + Class to define the control gains. + + Examples: + >>> gains = ControlGains(kp=1.0, ki=0.1, kd=0.01, k=0.5, b=0.05, ff=0.0) + >>> gains.kp + 1.0 + """ + kp: float = 0 ki: float = 0 kd: float = 0 @@ -66,6 +135,30 @@ class ControlGains: @dataclass class ControlModeConfig: + """ + Configuration for a control mode. + + Attributes: + entry_callback (Callable[[Any], None]): Callback to execute when entering this mode. + exit_callback (Callable[[Any], None]): Callback to execute when exiting this mode. + has_gains (bool): Indicates if the control mode utilizes control gains. + max_gains (Union[ControlGains, None]): The maximum allowable control gains (if applicable). + + Examples: + >>> def enter(actuator): + ... print("Entering mode") + >>> def exit(actuator): + ... print("Exiting mode") + >>> config = ControlModeConfig( + ... entry_callback=enter, + ... exit_callback=exit, + ... has_gains=True, + ... max_gains=ControlGains(1.0, 0.1, 0.01, 0.5, 0.05, 0.0) + ... ) + >>> config.has_gains + True + """ + entry_callback: Callable[[Any], None] exit_callback: Callable[[Any], None] has_gains: bool = False @@ -73,6 +166,28 @@ class ControlModeConfig: class CONTROL_MODE_CONFIGS(NamedTuple): + """ + Named tuple containing control mode configurations. + + Attributes: + IDLE (Optional[ControlModeConfig]): Configuration for IDLE mode. + POSITION (Optional[ControlModeConfig]): Configuration for POSITION mode. + CURRENT (Optional[ControlModeConfig]): Configuration for CURRENT mode. + VOLTAGE (Optional[ControlModeConfig]): Configuration for VOLTAGE mode. + IMPEDANCE (Optional[ControlModeConfig]): Configuration for IMPEDANCE mode. + VELOCITY (Optional[ControlModeConfig]): Configuration for VELOCITY mode. + TORQUE (Optional[ControlModeConfig]): Configuration for TORQUE mode. + + Examples: + >>> idle_config = ControlModeConfig( + ... entry_callback=lambda a: print("Idle entered"), + ... exit_callback=lambda a: print("Idle exited") + ... + >>> mode_configs = CONTROL_MODE_CONFIGS(IDLE=idle_config) + >>> mode_configs.IDLE.entry_callback(None) + Idle entered + """ + IDLE: Optional[ControlModeConfig] = None POSITION: Optional[ControlModeConfig] = None CURRENT: Optional[ControlModeConfig] = None @@ -104,11 +219,55 @@ class CONTROL_MODE_CONFIGS(NamedTuple): @runtime_checkable class MethodWithRequiredModes(Protocol): + """ + Protocol for methods that define required control modes. + + Attributes: + _required_modes (set[CONTROL_MODES]): A set of control modes in which the method is permitted. + + Examples: + >>> class Dummy: + ... _required_modes = {CONTROL_MODES.IDLE} + >>> isinstance(Dummy(), MethodWithRequiredModes) + True + """ + _required_modes: set[CONTROL_MODES] def requires(*modes: CONTROL_MODES) -> Callable[[T], T]: + """ + Decorator to specify required control modes for a method. + + The decorator attaches a set of required modes to the function, + ensuring that the function is only active in the specified control modes. + + Args: + *modes (CONTROL_MODES): One or more control modes required for the method. + + Raises: + TypeError: If any argument is not an instance of CONTROL_MODES. + + Returns: + Callable[[T], T]: The decorated function with an attached `_required_modes` attribute. + + Examples: + >>> @requires(CONTROL_MODES.POSITION, CONTROL_MODES.TORQUE) + ... def some_method(self, value): + ... return value + >>> some_method._required_modes # May output: {, } + """ + def decorator(func: T) -> T: + """ + Attach required control modes to the decorated function. + + Args: + func (T): The function to be decorated. + + Returns: + T: The same function with an updated `_required_modes` attribute. + """ if not all(isinstance(mode, CONTROL_MODES) for mode in modes): raise TypeError("All arguments to 'requires' must be of type CONTROL_MODES") @@ -123,6 +282,72 @@ def decorator(func: T) -> T: class ActuatorBase(ABC): + """ + Base class defining the structure and interface for an actuator. + + This abstract class provides common functionality for controlling an actuator, + including managing control mode transitions, restricting method calls based on mode, + and exposing actuator properties. + + Examples: + >>> class DummyActuator(ActuatorBase): + ... @property + ... def _CONTROL_MODE_CONFIGS(self): + ... return CONTROL_MODE_CONFIGS() + ... def start(self): + ... print("Started") + ... def stop(self): + ... print("Stopped") + ... def update(self): + ... print("Updated") + ... def set_motor_voltage(self, value: float) -> None: + ... print(f"Motor voltage set to {value}") + ... def set_motor_current(self, value: float) -> None: + ... print(f"Motor current set to {value}") + ... def set_motor_position(self, value: float) -> None: + ... print(f"Motor position set to {value}") + ... def set_motor_torque(self, value: float) -> None: + ... print(f"Motor torque set to {value}") + ... def set_joint_torque(self, value: float) -> None: + ... print(f"Joint torque set to {value}") + ... def set_current_gains(self, kp: float, ki: float, kd: float, ff: float) -> None: + ... print("Current gains set") + ... def set_position_gains(self, kp: float, ki: float, kd: float, ff: float) -> None: + ... print("Position gains set") + ... def set_impedance_gains(self, kp: float, ki: float, kd: float, k: float, b: float, ff: float) -> None: + ... print("Impedance gains set") + ... def home(self) -> None: + ... print("Homed") + ... @property + ... def motor_position(self) -> float: + ... return 100.0 + ... @property + ... def motor_velocity(self) -> float: + ... return 10.0 + ... @property + ... def motor_voltage(self) -> float: + ... return 24.0 + ... @property + ... def motor_current(self) -> float: + ... return 0.5 + ... @property + ... def motor_torque(self) -> float: + ... return 2.0 + ... @property + ... def case_temperature(self) -> float: + ... return 70.0 + ... @property + ... def winding_temperature(self) -> float: + ... return 90.0 + >>> actuator = DummyActuator( + ... tag="act1", + ... gear_ratio=100, + ... motor_constants=MOTOR_CONSTANTS(2048, 0.02, 0.001, 0.0001, 80.0, 120.0) + ... ) + >>> actuator.start() + Started + """ + def __init__( self, tag: str, @@ -133,6 +358,25 @@ def __init__( *args: Any, **kwargs: Any, ) -> None: + """ + Initialize an actuator. + + Args: + tag (str): A unique identifier for the actuator. + gear_ratio (float): The gear ratio of the actuator. + motor_constants (MOTOR_CONSTANTS): Motor constant configuration parameters. + frequency (int, optional): Control frequency in Hz. Defaults to 1000. + offline (bool, optional): Flag indicating if the actuator operates in offline mode. Defaults to False. + *args (Any): Additional positional arguments. + **kwargs (Any): Additional keyword arguments. + + Examples: + >>> actuator = DummyActuator( + ... tag="act1", + ... gear_ratio=100, + ... motor_constants=MOTOR_CONSTANTS(2048, 0.02, 0.001, 0.0001, 80.0, 120.0) + ... ) + """ self._MOTOR_CONSTANTS: MOTOR_CONSTANTS = motor_constants self._gear_ratio: float = gear_ratio self._tag: str = tag @@ -159,17 +403,71 @@ def __init__( self._set_mutated_methods() def __enter__(self) -> "ActuatorBase": + """ + Enter the runtime context related to this actuator. + + Starts the actuator and returns the instance. + + Returns: + ActuatorBase: The actuator instance. + + Examples: + >>> with actuator as a: + ... print("Inside context") + Started + Inside context + Stopped + """ self.start() return self def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> None: + """ + Exit the runtime context and stop the actuator. + + Args: + exc_type (Any): Exception type, if any. + exc_value (Any): Exception value, if any. + exc_traceback (Any): Exception traceback, if any. + + Examples: + >>> try: + ... with actuator: + ... raise ValueError("Test error") + ... except ValueError: + ... pass # actuator.stop() was automatically called + """ self.stop() def _restricted_method(self, method_name: str, *args: Any, **kwargs: Any) -> None: + """ + Fallback method for restricted operations. + + Logs an error indicating that the requested method is not available + in the current control mode. + + Args: + method_name (str): Name of the restricted method. + *args (Any): Positional arguments passed to the method. + **kwargs (Any): Keyword arguments passed to the method. + + Examples: + >>> actuator._restricted_method("set_motor_voltage") + # (Logs an error message and returns None) + """ LOGGER.error(f"{method_name}() is not available in {self._mode.name} mode.") return None def _set_original_methods(self) -> None: + """ + Store the original methods that require specific control modes. + + Iterates through known control mode methods and saves those that have + a `_required_modes` attribute to allow mode-based restrictions. + + Examples: + >>> print(actuator._original_methods) # Dictionary of method names to methods + """ for method_name in CONTROL_MODE_METHODS: try: method = getattr(self, method_name) @@ -179,6 +477,16 @@ def _set_original_methods(self) -> None: LOGGER.debug(msg=f"[{self.tag}] {method_name}() is not implemented in {self.tag}.") def _set_mutated_methods(self) -> None: + """ + Update actuator methods based on the current control mode. + + For each method stored in `_original_methods`, if the current control mode + is permitted, the original method is used; otherwise, a restricted version + that logs an error is assigned. + + Examples: + >>> actuator._set_mutated_methods() + """ for method_name, method in self._original_methods.items(): if self._mode in method._required_modes: setattr(self, method_name, method) @@ -188,27 +496,90 @@ def _set_mutated_methods(self) -> None: @property @abstractmethod def _CONTROL_MODE_CONFIGS(self) -> CONTROL_MODE_CONFIGS: + """ + Abstract property to obtain control mode configurations. + + Returns: + CONTROL_MODE_CONFIGS: The configuration settings for each control mode. + + Examples: + >>> config = actuator._CONTROL_MODE_CONFIGS # Implemented in subclass + """ pass @abstractmethod def start(self) -> None: + """ + Start the actuator. + + Must be implemented by subclasses to initialize and activate the actuator. + + Examples: + >>> actuator.start() + Started + """ pass @abstractmethod def stop(self) -> None: + """ + Stop the actuator. + + Must be implemented by subclasses to safely deactivate the actuator. + + Examples: + >>> actuator.stop() + Stopped + """ pass @abstractmethod def update(self) -> None: + """ + Update the actuator's state. + + Must be implemented by subclasses to refresh or recalculate state values. + + Examples: + >>> actuator.update() + Updated + """ pass def _get_control_mode_config(self, mode: CONTROL_MODES) -> Optional[ControlModeConfig]: + """ + Retrieve the control mode configuration for a specified mode. + + Args: + mode (CONTROL_MODES): The control mode for which to retrieve the configuration. + + Returns: + Optional[ControlModeConfig]: The configuration if it exists; otherwise, None. + + Examples: + >>> config = actuator._get_control_mode_config(CONTROL_MODES.IDLE) + >>> print(config) + None + """ return cast( Optional[ControlModeConfig], getattr(self._CONTROL_MODE_CONFIGS, mode.name), ) def set_control_mode(self, mode: CONTROL_MODES) -> None: + """ + Set the actuator's control mode. + + If the mode is changing, the exit callback for the current mode and + the entry callback for the new mode are executed, and methods are updated + to reflect any restrictions imposed by the new mode. + + Args: + mode (CONTROL_MODES): The new control mode to be set. + + Examples: + >>> actuator.set_control_mode(CONTROL_MODES.POSITION) + """ if self.mode == mode: LOGGER.debug(msg=f"[{self.tag}] Already in {self.mode.name} control mode.") return @@ -228,186 +599,633 @@ def set_control_mode(self, mode: CONTROL_MODES) -> None: @abstractmethod @requires(CONTROL_MODES.VOLTAGE) def set_motor_voltage(self, value: float) -> None: + """ + Set the motor voltage. + + Args: + value (float): The voltage value to be applied to the motor. + + Must be implemented by subclasses. + + Examples: + >>> actuator.set_motor_voltage(12.0) + """ pass @abstractmethod @requires(CONTROL_MODES.CURRENT) def set_motor_current(self, value: float) -> None: + """ + Set the motor current. + + Args: + value (float): The current value to be applied to the motor. + + Must be implemented by subclasses. + + Examples: + >>> actuator.set_motor_current(1.5) + """ pass @abstractmethod @requires(CONTROL_MODES.POSITION) def set_motor_position(self, value: float) -> None: + """ + Set the motor position. + + Args: + value (float): The target motor position in radians. + + Must be implemented by subclasses. + + Examples: + >>> actuator.set_motor_position(0.5) + """ pass @requires( CONTROL_MODES.POSITION - ) # This needs to be tested as set_motor_position is already decorated with requires + ) # TODO: This needs to be tested as set_motor_position is already decorated with requires def set_output_position(self, value: float) -> None: + """ + Set the output position of the actuator. + + Converts the desired output position (in radians) to a motor position by + applying the gear ratio, then delegates to `set_motor_position`. + + Args: + value (float): The desired output position in radians. + + Examples: + >>> # Assuming gear_ratio is 100, this will set the motor position to 100 * value. + >>> actuator.set_motor_position = lambda value: print(f"Motor position set to {value}") + >>> actuator.set_output_position(1.0) + Motor position set to 100.0 + """ self.set_motor_position(value=value * self.gear_ratio) @abstractmethod @requires(CONTROL_MODES.TORQUE) def set_motor_torque(self, value: float) -> None: + """ + Set the motor torque. + + Args: + value (float): The torque value to be applied to the motor. + + Must be implemented by subclasses. + + Examples: + >>> actuator.set_motor_torque(5.0) + """ pass @abstractmethod @requires(CONTROL_MODES.TORQUE) def set_joint_torque(self, value: float) -> None: + """ + Set the joint torque. + + Args: + value (float): The torque value to be applied to the joint. + + Must be implemented by subclasses. + + Examples: + >>> actuator.set_joint_torque(5.0) + """ pass @abstractmethod @requires(CONTROL_MODES.CURRENT) def set_current_gains(self, kp: float, ki: float, kd: float, ff: float) -> None: + """ + Set the current control gains. + + Args: + kp (float): Proportional gain. + ki (float): Integral gain. + kd (float): Derivative gain. + ff (float): Feed-forward gain. + + Must be implemented by subclasses. + + Examples: + >>> actuator.set_current_gains(1.0, 0.1, 0.01, 0.0) + """ pass @abstractmethod @requires(CONTROL_MODES.POSITION) def set_position_gains(self, kp: float, ki: float, kd: float, ff: float) -> None: + """ + Set the position control gains. + + Args: + kp (float): Proportional gain. + ki (float): Integral gain. + kd (float): Derivative gain. + ff (float): Feed-forward gain. + + Must be implemented by subclasses. + + Examples: + >>> actuator.set_position_gains(1.0, 0.1, 0.01, 0.0) + """ pass @abstractmethod @requires(CONTROL_MODES.IMPEDANCE) def set_impedance_gains(self, kp: float, ki: float, kd: float, k: float, b: float, ff: float) -> None: + """ + Set the impedance control gains. + + Args: + kp (float): Proportional gain. + ki (float): Integral gain. + kd (float): Derivative gain. + k (float): Stiffness coefficient. + b (float): Damping coefficient. + ff (float): Feed-forward gain. + + Must be implemented by subclasses. + + Examples: + >>> actuator.set_impedance_gains(1.0, 0.1, 0.01, 0.5, 0.05, 0.0) + """ pass @abstractmethod def home(self) -> None: + """ + Home the actuator. + + Aligns the actuator to a known reference position. + Must be implemented by subclasses. + + Examples: + >>> actuator.home() + Homed + """ pass def set_motor_zero_position(self, value: float) -> None: - """Sets motor zero position in radians""" + """ + Set the motor zero position. + + Args: + value (float): The motor zero position in radians. + + Examples: + >>> actuator.set_motor_zero_position(0.0) + >>> actuator.motor_zero_position + 0.0 + """ self._motor_zero_position = value def set_motor_position_offset(self, value: float) -> None: - """Sets joint offset position in radians""" + """ + Set the motor position offset. + + Args: + value (float): The motor position offset in radians. + + Examples: + >>> actuator.set_motor_position_offset(0.1) + >>> actuator.motor_position_offset + 0.1 + """ self._motor_position_offset = value def set_joint_zero_position(self, value: float) -> None: - """Sets joint zero position in radians""" + """ + Set the joint zero position. + + Args: + value (float): The joint zero position in radians. + + Examples: + >>> actuator.set_joint_zero_position(0.0) + >>> actuator.joint_zero_position + 0.0 + """ self._joint_zero_position = value def set_joint_position_offset(self, value: float) -> None: - """Sets joint offset position in radians""" + """ + Set the joint position offset. + + Args: + value (float): The joint position offset in radians. + + Examples: + >>> actuator.set_joint_position_offset(0.1) + >>> actuator.joint_position_offset + 0.1 + """ self._joint_position_offset = value def set_joint_direction(self, value: int) -> None: - """Sets joint direction to 1 or -1""" + """ + Set the joint direction. + + Args: + value (int): The joint direction, expected to be either 1 or -1. + + Examples: + >>> actuator.set_joint_direction(-1) + >>> actuator.joint_direction + -1 + """ self._joint_direction = value @property @abstractmethod def motor_position(self) -> float: + """ + Get the motor position. + + Returns: + float: The current motor position in radians. + + Must be implemented by subclasses. + + Examples: + >>> pos = actuator.motor_position + >>> print(pos) + 100.0 + """ pass @property def output_position(self) -> float: """ - Position of the output in radians. - This is calculated by scaling the motor angle with the gear ratio. - Note that this method does not consider compliance from an SEA. + Get the output position. + + Returns: + float: The output position in radians, calculated by dividing the motor + position by the gear ratio. Note that this does not account for SEA compliance. + + Examples: + >>> # If motor_position is 100.0 and gear_ratio is 100, output_position will be 1.0 + >>> actuator.output_position + 1.0 """ return self.motor_position / self.gear_ratio @property @abstractmethod def motor_velocity(self) -> float: + """ + Get the motor velocity. + + Returns: + float: The current motor velocity in radians per second. + + Must be implemented by subclasses. + + Examples: + >>> velocity = actuator.motor_velocity + >>> print(velocity) + 10.0 + """ pass @property def output_velocity(self) -> float: """ - Velocity of the output in radians. - This is calculated by scaling the motor angle with the gear ratio. - Note that this method does not consider compliance from an SEA. + Get the output velocity. + + Returns: + float: The output velocity in radians per second, calculated by dividing the motor + velocity by the gear ratio. Note that this does not account for SEA compliance. + + Examples: + >>> # If motor_velocity is 10.0 and gear_ratio is 100, output_velocity will be 0.1 + >>> actuator.output_velocity + 0.1 """ return self.motor_velocity / self.gear_ratio @property @abstractmethod def motor_voltage(self) -> float: + """ + Get the motor voltage. + + Returns: + float: The current motor voltage. + + Must be implemented by subclasses. + + Examples: + >>> voltage = actuator.motor_voltage + >>> print(voltage) + 24.0 + """ pass @property @abstractmethod def motor_current(self) -> float: + """ + Get the motor current. + + Returns: + float: The current motor current. + + Must be implemented by subclasses. + + Examples: + >>> current = actuator.motor_current + >>> print(current) + 0.5 + """ pass @property @abstractmethod def motor_torque(self) -> float: + """ + Get the motor torque. + + Returns: + float: The current motor torque. + + Must be implemented by subclasses. + + Examples: + >>> torque = actuator.motor_torque + >>> print(torque) + 2.0 + """ pass @property def MOTOR_CONSTANTS(self) -> MOTOR_CONSTANTS: + """ + Get the motor constants configuration. + + Returns: + MOTOR_CONSTANTS: The motor constants. + + Examples: + >>> constants = actuator.MOTOR_CONSTANTS + >>> constants.MAX_CASE_TEMPERATURE + 80.0 + """ return self._MOTOR_CONSTANTS @property def mode(self) -> CONTROL_MODES: + """ + Get the current control mode. + + Returns: + CONTROL_MODES: The actuator's current control mode. + + Examples: + >>> actuator.mode + + """ return self._mode @property def tag(self) -> str: + """ + Get the actuator tag. + + Returns: + str: The unique identifier for the actuator. + + Examples: + >>> actuator.tag + "act1" + """ return self._tag @property def is_homed(self) -> bool: + """ + Check if the actuator has been homed. + + Returns: + bool: True if the actuator is homed; otherwise, False. + + Examples: + >>> actuator.is_homed + False + """ return self._is_homed @property def frequency(self) -> int: + """ + Get the actuator's control frequency. + + Returns: + int: The control frequency in Hz. + + Examples: + >>> actuator.frequency + 1000 + """ return self._frequency @property def is_offline(self) -> bool: + """ + Check if the actuator is in offline mode. + + Returns: + bool: True if offline; otherwise, False. + + Examples: + >>> actuator.is_offline + False + """ return self._is_offline @property def gear_ratio(self) -> float: + """ + Get the gear ratio. + + Returns: + float: The gear ratio of the actuator. + + Examples: + >>> actuator.gear_ratio + 100 + """ return self._gear_ratio @property def max_case_temperature(self) -> float: + """ + Get the maximum allowed case temperature. + + Returns: + float: The maximum case temperature defined in motor constants. + + Examples: + >>> actuator.max_case_temperature + 80.0 + """ return self._MOTOR_CONSTANTS.MAX_CASE_TEMPERATURE @property @abstractmethod def case_temperature(self) -> float: + """ + Get the current case temperature. + + Returns: + float: The current case temperature. + + Must be implemented by subclasses. + + Examples: + >>> temp = actuator.case_temperature + >>> print(temp) + 70.0 + """ pass @property @abstractmethod def winding_temperature(self) -> float: + """ + Get the current winding temperature. + + Returns: + float: The current winding temperature. + + Must be implemented by subclasses. + + Examples: + >>> temp = actuator.winding_temperature + >>> print(temp) + 90.0 + """ pass @property def max_winding_temperature(self) -> float: + """ + Get the maximum allowed winding temperature. + + Returns: + float: The maximum winding temperature defined in motor constants. + + Examples: + >>> actuator.max_winding_temperature + 120.0 + """ return self._MOTOR_CONSTANTS.MAX_WINDING_TEMPERATURE @property def motor_zero_position(self) -> float: + """ + Get the motor zero position. + + Returns: + float: The motor zero position in radians. + + Examples: + >>> actuator.set_motor_zero_position(0.0) + >>> actuator.motor_zero_position + 0.0 + """ return self._motor_zero_position @property def motor_position_offset(self) -> float: + """ + Get the motor position offset. + + Returns: + float: The motor position offset in radians. + + Examples: + >>> actuator.set_motor_position_offset(0.1) + >>> actuator.motor_position_offset + 0.1 + """ return self._motor_position_offset @property def joint_zero_position(self) -> float: + """ + Get the joint zero position. + + Returns: + float: The joint zero position in radians. + + Examples: + >>> actuator.set_joint_zero_position(0.0) + >>> actuator.joint_zero_position + 0.0 + """ return self._joint_zero_position @property def joint_position_offset(self) -> float: + """ + Get the joint position offset. + + Returns: + float: The joint position offset in radians. + + Examples: + >>> actuator.set_joint_position_offset(0.1) + >>> actuator.joint_position_offset + 0.1 + """ return self._joint_position_offset @property def joint_direction(self) -> int: + """ + Get the joint direction. + + Returns: + int: The joint direction (1 or -1). + + Examples: + >>> actuator.set_joint_direction(-1) + >>> actuator.joint_direction + -1 + """ return self._joint_direction @property def is_open(self) -> bool: + """ + Check if the actuator is open. + + Returns: + bool: True if open; otherwise, False. + + Examples: + >>> actuator._is_open = True + >>> actuator.is_open + True + """ return self._is_open @property def is_streaming(self) -> bool: + """ + Check if the actuator is streaming data. + + Returns: + bool: True if streaming; otherwise, False. + + Examples: + >>> actuator._is_streaming = True + >>> actuator.is_streaming + True + """ return self._is_streaming diff --git a/opensourceleg/actuators/decorators.py b/opensourceleg/actuators/decorators.py index df0de83..c518b6a 100644 --- a/opensourceleg/actuators/decorators.py +++ b/opensourceleg/actuators/decorators.py @@ -9,6 +9,31 @@ def check_actuator_connection(func: Callable) -> Callable: + """ + Decorator that verifies the actuator is connected before executing the method. + + This decorator checks if the actuator is operating online. If the actuator is + offline, it raises an ActuatorConnectionException using the actuator's tag. + + Args: + func (Callable): The method to wrap. It is expected to be an instance method of ActuatorBase. + + Returns: + Callable: The wrapped method that executes only if the actuator is online. + + Raises: + ActuatorConnectionException: If the actuator is offline. + + Examples: + >>> class MyActuator(ActuatorBase): + ... @check_actuator_connection + ... def my_method(self): + ... return "Hello, world!" + ... + >>> actuator = MyActuator() + >>> actuator.my_method() + """ + @wraps(func) def wrapper(self: ActuatorBase, *args: Any, **kwargs: Any) -> Any: if self.is_offline: @@ -20,6 +45,32 @@ def wrapper(self: ActuatorBase, *args: Any, **kwargs: Any) -> Any: def check_actuator_open(func: Callable) -> Callable: + """ + Decorator that ensures the actuator is open before executing the method. + + This decorator checks if the actuator is in an open state. If it is not open, + it raises an ActuatorConnectionException with the actuator's tag. + + Args: + func (Callable): The method to wrap. It is expected to be an instance method of ActuatorBase. + + Returns: + Callable: The wrapped method that executes only if the actuator is open. + + Raises: + ActuatorConnectionException: If the actuator is not open. + + Example: + >>> class MyActuator(ActuatorBase): + ... @check_actuator_open + ... def my_method(self): + ... return "Hello, world!" + ... + >>> actuator = MyActuator() + >>> actuator.my_method() + + """ + @wraps(func) def wrapper(self: ActuatorBase, *args: Any, **kwargs: Any) -> Any: if not self.is_open: @@ -31,6 +82,31 @@ def wrapper(self: ActuatorBase, *args: Any, **kwargs: Any) -> Any: def check_actuator_stream(func: Callable) -> Callable: + """ + Decorator that verifies the actuator is streaming data before executing the method. + + This decorator checks if the actuator is actively streaming data. If the actuator + is not streaming, it raises an ActuatorStreamException using the actuator's tag. + + Args: + func (Callable): The method to wrap. It is expected to be an instance method of ActuatorBase. + + Returns: + Callable: The wrapped method that executes only if the actuator is streaming. + + Raises: + ActuatorStreamException: If the actuator is not streaming. + + Example: + >>> class MyActuator(ActuatorBase): + ... @check_actuator_stream + ... def my_method(self): + ... return "Hello, world!" + ... + >>> actuator = MyActuator() + >>> actuator.my_method() + """ + @wraps(func) def wrapper(self: ActuatorBase, *args: Any, **kwargs: Any) -> Any: if not self.is_streaming: diff --git a/opensourceleg/actuators/dephy.py b/opensourceleg/actuators/dephy.py index 30fd18e..e689a53 100644 --- a/opensourceleg/actuators/dephy.py +++ b/opensourceleg/actuators/dephy.py @@ -120,6 +120,16 @@ def _dephy_impedance_mode_exit(dephy_actuator: "DephyActuator") -> None: class DephyActuator(Device, ActuatorBase): # type: ignore[no-any-unimported] + """ + Interface to a Dephy actuator device. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0', gear_ratio=2.0) + >>> actuator.start() + >>> actuator.set_motor_voltage(1500) + >>> print(f"Joint position: {actuator.joint_position:.2f} rad") + """ + def __init__( self, tag: str = "DephyActuator", @@ -141,6 +151,9 @@ def __init__( frequency=frequency, offline=offline, ) + """ + + """ self._debug_level: int = debug_level if dephy_log else 6 self._dephy_log: bool = dephy_log @@ -177,6 +190,17 @@ def _CONTROL_MODE_CONFIGS(self) -> CONTROL_MODE_CONFIGS: @check_actuator_connection def start(self) -> None: + """ + Starts the actuator by opening the port, starting data streaming, + reading initial data, and setting the control mode to VOLTAGE. + + Returns: + None + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + """ try: self.open() self._is_open = True @@ -198,6 +222,19 @@ def start(self) -> None: @check_actuator_stream @check_actuator_open def stop(self) -> None: + """ + Stops the actuator by stopping the motor, switching to IDLE mode, + stopping data streaming, and closing the connection. + + Returns: + None + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> # ... perform control tasks ... + >>> actuator.stop() + """ self.stop_motor() self.set_control_mode(mode=CONTROL_MODES.IDLE) self._is_streaming = False @@ -205,6 +242,19 @@ def stop(self) -> None: self.close() def update(self) -> None: + """ + Updates the actuator's data by reading new values and updating the thermal model. + It raises exceptions if thermal limits are exceeded. + + Returns: + None + + Examples: + >>> actuator = DephyActuator() + >>> actuator.start() + >>> actuator.update() + >>> print(f"Motor current: {actuator.motor_current} mA") + """ self._data = self.read() self._thermal_model.T_c = self.case_temperature @@ -259,6 +309,10 @@ def home( This is used to detect if the actuator or joint has hit a hard stop. Default is 5000 mA. velocity_threshold (float): Velocity threshold in rad/s to stop homing the joint or actuator. This is also used to detect if the actuator or joint has hit a hard stop. Default is 0.001 rad/s. + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> actuator.home(homing_voltage=2000, homing_direction=-1) """ is_homing = True @@ -319,11 +373,22 @@ def make_encoder_map(self, overwrite: bool = False) -> None: This method makes a lookup table to calculate the position measured by the joint encoder. This is necessary because the magnetic output encoders are nonlinear. By making the map while the joint is unloaded, joint position calculated by motor position * gear ratio - should be the same as the true joint position. + should be the same as the true joint position. Output from this function is a file containing a_i values + parameterizing the map. - Output from this function is a file containing a_i values parameterizing the map + Args: + overwrite (bool): Whether to overwrite the existing encoder map. Default is False. + + Returns: + None + + Eqn: + position = sum from i=0^5 (a_i*counts^i) - Eqn: position = sum from i=0^5 (a_i*counts^i) + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> actuator.make_encoder_map(overwrite=True) Author: Kevin Best U-M Locolab | Neurobionics Lab @@ -389,6 +454,12 @@ def set_motor_torque(self, value: float) -> None: Sets the motor torque in Nm. This is the torque that is applied to the motor rotor, not the joint or output. Args: value (float): The torque to set in Nm. + Returns: + None + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> actuator.set_motor_torque(0.1) """ self.set_motor_current( value / self.MOTOR_CONSTANTS.NM_PER_MILLIAMP, @@ -401,6 +472,14 @@ def set_joint_torque(self, value: float) -> None: Args: value (float): torque in N_m + + Returns: + None + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> actuator.set_joint_torque(0.1) """ self.set_motor_torque(value=value / self.gear_ratio) @@ -412,6 +491,14 @@ def set_output_torque(self, value: float) -> None: Args: value (float): torque in N_m + + Returns: + None + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> actuator.set_output_torque(0.1) """ self.set_motor_torque(value=value / self.gear_ratio) @@ -424,6 +511,13 @@ def set_motor_current( Args: value (float): The current to set in mA. + Returns: + None + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> actuator.set_motor_current(1000) """ self.command_motor_current(value=int(value)) @@ -437,6 +531,14 @@ def set_motor_voltage(self, value: float) -> None: Args: value (float): The voltage to set in mV. + + Returns: + None + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> actuator.set_motor_voltage(100) TODO: Validate number """ self.command_motor_voltage(value=int(value)) @@ -451,6 +553,14 @@ def set_motor_position(self, value: float) -> None: Args: value (float): The position to set + + Returns: + None + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> actuator.set_motor_position(0.1) """ self.command_motor_position( value=int( @@ -473,6 +583,14 @@ def set_position_gains( ki (float): The integral gain kd (float): The derivative gain ff (float): The feedforward gain + + Returns: + None + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> actuator.set_position_gains(kp=30, ki=0, kd=0, ff=0) """ self.set_gains( kp=int(kp), @@ -498,6 +616,14 @@ def set_current_gains( ki (float): The integral gain kd (float): The derivative gain ff (float): The feedforward gain + + Returns: + None + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> actuator.set_current_gains(kp=40, ki=400, kd=0, ff=128) """ self.set_gains( kp=int(kp), @@ -532,6 +658,14 @@ def set_output_impedance( k (float): Spring constant. Defaults to 100 Nm/rad. b (float): Damping constant. Defaults to 3.0 Nm/rad/s. ff (float): Feedforward gain. Defaults to 128. + + Returns: + None + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> actuator.set_output_impedance(kp=40, ki=400, kd=0, k=100, b=3, ff=128) """ self.set_motor_impedance( kp=kp, @@ -562,6 +696,14 @@ def set_impedance_gains( k (float): The spring constant b (float): The damping constant ff (float): The feedforward gain + + Returns: + None + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> actuator.set_impedance_gains(kp=40, ki=400, kd=0, k=200, b=400, ff=128) """ self.set_gains( kp=int(kp), @@ -591,6 +733,14 @@ def set_motor_impedance( k (float): Spring constant. Defaults to 0.08922 Nm/rad. b (float): Damping constant. Defaults to 0.0038070 Nm/rad/s. ff (float): Feedforward gain. Defaults to 128. + + Returns: + None + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> actuator.set_motor_impedance(kp=40, ki=400, kd=0, k=0.08922, b=0.0038070, ff=128) TODO: Validate numbers """ self.set_impedance_gains( kp=kp, @@ -602,12 +752,35 @@ def set_motor_impedance( ) def set_encoder_map(self, encoder_map: np.polynomial.polynomial.Polynomial) -> None: - """Sets the joint encoder map""" + """ + Sets the joint encoder map + + Args: + encoder_map (np.polynomial.polynomial.Polynomial): The encoder map to set + + Returns: + None + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> actuator.set_encoder_map(np.polynomial.polynomial.Polynomial(coef=[1, 2, 3, 4, 5])) + """ self._encoder_map: np.polynomial.polynomial.Polynomial = encoder_map @property def encoder_map(self) -> Optional[np.polynomial.polynomial.Polynomial]: - """Polynomial coefficients defining the joint encoder map from counts to radians.""" + """ + Polynomial coefficients defining the joint encoder map from counts to radians. + + Returns: + Optional[np.polynomial.polynomial.Polynomial]: The encoder map + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(actuator.encoder_map) + """ if getattr(self, "_encoder_map", None) is not None: return self._encoder_map else: @@ -616,7 +789,17 @@ def encoder_map(self) -> Optional[np.polynomial.polynomial.Polynomial]: @property def motor_voltage(self) -> float: - """Q-axis motor voltage in mV.""" + """ + Q-axis motor voltage in mV. + + Returns: + float: Motor voltage in mV. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Motor voltage: {actuator.motor_voltage} mV") + """ if self._data is not None: return float(self._data["mot_volt"]) else: @@ -627,6 +810,17 @@ def motor_voltage(self) -> float: @property def motor_current(self) -> float: + """ + Motor current in mA. + + Returns: + float: Motor current in mA. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Motor current: {actuator.motor_current} mA") + """ if self._data is not None: return float(self._data["mot_cur"]) else: @@ -637,6 +831,17 @@ def motor_current(self) -> float: @property def motor_torque(self) -> float: + """ + Torque at the motor output in Nm. + + Returns: + float: Motor torque in Nm. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Motor torque: {actuator.motor_torque} Nm") + """ if self._data is not None: return float(self._data["mot_cur"] * self.MOTOR_CONSTANTS.NM_PER_MILLIAMP) else: @@ -647,6 +852,17 @@ def motor_torque(self) -> float: @property def motor_position(self) -> float: + """ + Motor position in radians. + + Returns: + float: Motor position in radians. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Motor position: {actuator.motor_position} rad") + """ if self._data is not None: return ( float(self._data["mot_ang"] * self.MOTOR_CONSTANTS.RAD_PER_COUNT) @@ -661,7 +877,17 @@ def motor_position(self) -> float: @property def motor_encoder_counts(self) -> int: - """Raw reading from motor encoder in counts.""" + """ + Raw reading from motor encoder in counts. + + Returns: + int: Motor encoder counts. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Motor encoder counts: {actuator.motor_encoder_counts}") + """ if self._data is not None: return int(self._data["mot_ang"]) else: @@ -672,7 +898,17 @@ def motor_encoder_counts(self) -> int: @property def joint_encoder_counts(self) -> int: - """Raw reading from joint encoder in counts.""" + """ + Raw reading from joint encoder in counts. + + Returns: + int: Joint encoder counts. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Joint encoder counts: {actuator.joint_encoder_counts}") + """ if self._data is not None: return int(self._data["ank_ang"]) else: @@ -683,6 +919,18 @@ def joint_encoder_counts(self) -> int: @property def motor_velocity(self) -> float: + """ + Motor velocity in rad/s. + + Returns: + float: Motor velocity in rad/s. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Motor velocity: {actuator.motor_velocity} rad/s") + """ + if self._data is not None: return int(self._data["mot_vel"]) * RAD_PER_DEG else: @@ -693,6 +941,18 @@ def motor_velocity(self) -> float: @property def motor_acceleration(self) -> float: + """ + Motor acceleration in rad/s^2. + + Returns: + float: Motor acceleration in rad/s^2. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Motor acceleration: {actuator.motor_acceleration} rad/s^2") + """ + if self._data is not None: return float(self._data["mot_acc"]) else: @@ -703,7 +963,17 @@ def motor_acceleration(self) -> float: @property def battery_voltage(self) -> float: - """Battery voltage in mV.""" + """ + Battery voltage in mV. + + Returns: + float: Battery voltage in mV. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Battery voltage: {actuator.battery_voltage} mV") + """ if self._data is not None: return float(self._data["batt_volt"]) else: @@ -714,7 +984,17 @@ def battery_voltage(self) -> float: @property def battery_current(self) -> float: - """Battery current in mA.""" + """ + Battery current in mA. + + Returns: + float: Battery current in mA. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Battery current: {actuator.battery_current} mA") + """ if self._data is not None: return float(self._data["batt_curr"]) else: @@ -725,6 +1005,17 @@ def battery_current(self) -> float: @property def joint_position(self) -> float: + """ + Joint position in radians. + + Returns: + float: Joint position in radians. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Joint position: {actuator.joint_position} rad") + """ if self._data is not None: return ( float(self._data["ank_ang"] * self.MOTOR_CONSTANTS.RAD_PER_COUNT) @@ -739,6 +1030,17 @@ def joint_position(self) -> float: @property def joint_velocity(self) -> float: + """ + Joint velocity in rad/s. + + Returns: + float: Joint velocity in rad/s. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Joint velocity: {actuator.joint_velocity} rad/s") + """ if self._data is not None: return float(self._data["ank_vel"] * RAD_PER_DEG) * self.joint_direction else: @@ -750,13 +1052,31 @@ def joint_velocity(self) -> float: @property def joint_torque(self) -> float: """ - Torque at the joint output in Nm. - This is calculated using motor current, k_t, and the gear ratio. + Torque at the joint output in Nm. This is calculated using motor current, k_t, and the gear ratio. + + Returns: + float: Joint torque in Nm. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Joint torque: {actuator.joint_torque} Nm") """ return self.motor_torque * self.gear_ratio @property def case_temperature(self) -> float: + """ + Case temperature in degrees celsius. This is read during actuator update. + + Returns: + float: Case temperature in degrees celsius. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Case temperature: {actuator.case_temperature} C") + """ if self._data is not None: return float(self._data["temperature"]) else: @@ -770,6 +1090,14 @@ def winding_temperature(self) -> float: """ ESTIMATED temperature of the windings in celsius. This is calculated based on the thermal model using motor current. + + Returns: + float: Winding temperature in degrees celsius. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Winding temperature: {actuator.winding_temperature} C") """ if self._data is not None: return float(self._thermal_model.T_w) @@ -801,6 +1129,14 @@ def accelx(self) -> float: """ Acceleration in x direction in m/s^2. Measured using actpack's onboard IMU. + + Returns: + float: Acceleration in x direction in m/s^2. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Acceleration in x direction: {actuator.accelx} m/s^2") """ if self._data is not None: return float(self._data["accelx"] * M_PER_SEC_SQUARED_ACCLSB) @@ -815,6 +1151,14 @@ def accely(self) -> float: """ Acceleration in y direction in m/s^2. Measured using actpack's onboard IMU. + + Returns: + float: Acceleration in y direction in m/s^2. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Acceleration in y direction: {actuator.accely} m/s^2") """ if self._data is not None: return float(self._data["accely"] * M_PER_SEC_SQUARED_ACCLSB) @@ -829,6 +1173,14 @@ def accelz(self) -> float: """ Acceleration in z direction in m/s^2. Measured using actpack's onboard IMU. + + Returns: + float: Acceleration in z direction in m/s^2. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Acceleration in z direction: {actuator.accelz} m/s^2") """ if self._data is not None: return float(self._data["accelz"] * M_PER_SEC_SQUARED_ACCLSB) @@ -843,6 +1195,14 @@ def gyrox(self) -> float: """ Angular velocity in x direction in rad/s. Measured using actpack's onboard IMU. + + Returns: + float: Angular velocity in x direction in rad/s. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Angular velocity in x direction: {actuator.gyrox} rad/s") """ if self._data is not None: return float(self._data["gyrox"] * RAD_PER_SEC_GYROLSB) @@ -857,6 +1217,14 @@ def gyroy(self) -> float: """ Angular velocity in y direction in rad/s. Measured using actpack's onboard IMU. + + Returns: + float: Angular velocity in y direction in rad/s. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Angular velocity in y direction: {actuator.gyroy} rad/s") """ if self._data is not None: return float(self._data["gyroy"] * RAD_PER_SEC_GYROLSB) @@ -871,6 +1239,14 @@ def gyroz(self) -> float: """ Angular velocity in z direction in rad/s. Measured using actpack's onboard IMU. + + Returns: + float: Angular velocity in z direction in rad/s. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Angular velocity in z direction: {actuator.gyroz} rad/s") """ if self._data is not None: return float(self._data["gyroz"] * RAD_PER_SEC_GYROLSB) @@ -886,6 +1262,17 @@ def thermal_scaling_factor(self) -> float: Scale factor to use in torque control, in [0,1]. If you scale the torque command by this factor, the motor temperature will never exceed max allowable temperature. For a proof, see paper referenced in thermal model. + + Returns: + float: Thermal scaling factor. + + Examples: + >>> actuator = DephyActuator(port='/dev/ttyACM0') + >>> actuator.start() + >>> print(f"Thermal scaling factor: {actuator.thermal_scaling_factor}") + >>> actuator.update() + >>> # This will update the thermal model and return the new scaling factor. + >>> print(f"Thermal scaling factor: {actuator.thermal_scaling_factor}") """ return self._thermal_scale diff --git a/opensourceleg/actuators/tmotor.py b/opensourceleg/actuators/tmotor.py index b349279..c379d6e 100644 --- a/opensourceleg/actuators/tmotor.py +++ b/opensourceleg/actuators/tmotor.py @@ -82,12 +82,12 @@ class TMotorMITCANActuator(ActuatorBase, TMotorManager_mit_can): def __init__( self, tag: str = "TMotorActuator", - motor_type="AK80-9", - motor_ID=41, + motor_type: str = "AK80-9", + motor_ID: int = 41, gear_ratio: float = 1.0, frequency: int = 500, offline: bool = False, - max_mosfett_temp=50, + max_mosfett_temp: float = 50, ): """ Sets up the motor manager. Note the device will not be powered on by this method! You must @@ -409,7 +409,14 @@ def output_torque(self) -> float: return float(self.motor_current * MIT_Params[self.type]["Kt_actual"] * MIT_Params[self.type]["GEAR_RATIO"]) # uses plain impedance mode, will send 0.0 for current command. - def set_impedance_gains(self, kp=0, ki=0, K=0.08922, B=0.0038070, ff=0) -> None: + def set_impedance_gains( + self, + kp: float = 0, + ki: float = 0, + K: float = 0.08922, + B: float = 0.0038070, + ff: float = 0, + ) -> None: """ Uses plain impedance mode, will send 0.0 for current command in addition to position request. @@ -436,7 +443,13 @@ def set_impedance_gains(self, kp=0, ki=0, K=0.08922, B=0.0038070, ff=0) -> None: self._command.kd = B self._command.velocity = 0.0 - def set_current_gains(self, kp=40, ki=400, ff=128, spoof=False) -> None: + def set_current_gains( + self, + kp: float = 40, + ki: float = 400, + ff: float = 128, + spoof: bool = False, + ) -> None: """ Uses plain current mode, will send 0.0 for position gains in addition to requested current. @@ -448,7 +461,10 @@ def set_current_gains(self, kp=40, ki=400, ff=128, spoof=False) -> None: """ pass - def set_velocity_gains(self, kd=1.0) -> None: + def set_velocity_gains( + self, + kd: float = 1.0, + ) -> None: """ Uses plain speed mode, will send 0.0 for position gain and for feed forward current. diff --git a/opensourceleg/collections/validators.py b/opensourceleg/collections/validators.py index a5e00fb..8b9d272 100644 --- a/opensourceleg/collections/validators.py +++ b/opensourceleg/collections/validators.py @@ -1,31 +1,121 @@ +""" +Module for attribute validation using descriptor classes. + +This module provides an abstract base class `Validator` for creating attribute validators +that can be used as descriptors. It also includes a concrete implementation, `Number`, +which validates that a given value is a number (int or float) and optionally enforces +minimum and/or maximum limits. +""" + from abc import ABC, abstractmethod from typing import Any, Optional, Union class Validator(ABC): - def __set_name__(self, name: str) -> None: + """ + Abstract base class for attribute validators. + + This descriptor class implements the __set_name__, __get__, and __set__ methods to + automatically manage a private attribute name and to perform validation when setting + an attribute's value. Subclasses must implement the validate() method. + """ + + def __set_name__(self, owner: Any, name: str) -> None: + """ + Automatically called to set the name of the attribute. + + This method is invoked at class creation time to assign a private name + to the attribute being managed by the validator. + + Args: + owner (Any): The owner class where the descriptor is defined. + name (str): The name of the attribute. + """ self.private_name = f"_{name}" def __get__(self, instance: Any, objtype: Any = None) -> Any: + """ + Retrieve the value of the managed attribute. + + Args: + instance (Any): The instance from which the attribute is accessed. + objtype (Any, optional): The type of the owner class. Defaults to None. + + Returns: + Any: The current value of the attribute. + """ return getattr(instance, self.private_name) def __set__(self, instance: Any, value: Any) -> None: + """ + Set the value of the managed attribute after validating it. + + The method calls the validate() function to ensure that the provided value meets + the criteria defined by the validator. If valid, the value is stored under the + private attribute name. + + Args: + instance (Any): The instance on which the attribute is being set. + value (Any): The new value to set. + + Raises: + Any: Any exception raised by the validate() method. + """ self.validate(value) setattr(instance, self.private_name, value) @abstractmethod def validate(self, value: Any) -> None: + """ + Validate the value being assigned to the attribute. + + Subclasses must implement this method to enforce specific validation rules. + + Args: + value (Any): The value to validate. + + Raises: + Exception: If the value does not meet the validation criteria. + """ pass class Number(Validator): + """ + Validator for numerical values. + + Ensures that the value is an integer or a float and optionally enforces a minimum + and/or maximum limit. + """ + def __init__( - self, min_value: Optional[Union[int, float]] = None, max_value: Optional[Union[int, float]] = None + self, + min_value: Optional[Union[int, float]] = None, + max_value: Optional[Union[int, float]] = None, ) -> None: + """ + Initialize the Number validator. + + Args: + min_value (Optional[Union[int, float]]): The minimum acceptable value (inclusive). + Defaults to None, meaning no lower bound. + max_value (Optional[Union[int, float]]): The maximum acceptable value (inclusive). + Defaults to None, meaning no upper bound. + """ self.min_value: Optional[Union[int, float]] = min_value self.max_value: Optional[Union[int, float]] = max_value def validate(self, value: Union[int, float]) -> None: + """ + Validate that the value is a number and within the specified range. + + Args: + value (Union[int, float]): The number to validate. + + Raises: + TypeError: If the value is not an int or float. + ValueError: If the value is less than min_value or greater than max_value. + """ if not isinstance(value, (int, float)): raise TypeError("Value must be an int or float") @@ -37,11 +127,25 @@ def validate(self, value: Union[int, float]) -> None: if __name__ == "__main__": + # Example usage demonstrating the Number validator. class Gains: + """ + Example class using the Number validator to validate gain values. + + The attribute 'kp' is validated to ensure it is a number between 0 and 100. + """ + kp = Number(0, 100) - def __init__(self, price: int) -> None: - self.kp = price + def __init__(self, kp_value: int) -> None: + """ + Initialize the Gains instance. + + Args: + kp_value (int): The gain value to be validated and assigned to 'kp'. + """ + self.kp = kp_value + # This instantiation will raise a ValueError because 200 is greater than the allowed maximum of 100. g = Gains(200) diff --git a/opensourceleg/control/compiled_controller.py b/opensourceleg/control/compiled_controller.py index fcc169c..33b8af2 100644 --- a/opensourceleg/control/compiled_controller.py +++ b/opensourceleg/control/compiled_controller.py @@ -15,21 +15,19 @@ class CompiledController: You can define these input and output structures however you please. See examples folder of repo for examples. - Parameters: - ----------- + Args: library_name (string): The name of the compiled library file, without the *.so library_path (string): The path to the directory containing the library. See examples for how to get working directory of parent script. main_function_name (string): Name of the main function to call within the library. - This is the function that will get called via the run() method - initialization_function_name (string): Name of an initialization function for your library. - This gets called only once when the library is loaded. If you don't have an initialization function, pass None. + This is the function that will get called via the run() method initialization_function_name (string): Name + of an initialization function for your library. This gets called only once when the library is loaded. If + you don't have an initialization function, pass None. cleanup_function_name (string): Name of a cleanup function for your library. This gets called when the CompiledController class has gone out of scope and is garbage collected. Again, pass None if you don't need this functionality. Authors: - -------- Kevin Best, Senthur Raj Ayyappan Neurobionics Lab Robotics Department diff --git a/opensourceleg/logging/decorators.py b/opensourceleg/logging/decorators.py index ef9c2a0..ccfb907 100644 --- a/opensourceleg/logging/decorators.py +++ b/opensourceleg/logging/decorators.py @@ -1,3 +1,17 @@ +""" +Module providing decorators to mark functions as deprecated. + +This module contains three decorators that can be used to mark functions as deprecated, +optionally providing a suggested alternative or automatically routing calls to an alternative +function. When a deprecated function is called, a warning is logged using the LOGGER from +opensourceleg.logging. + +Decorators: + deprecated: Marks a function as deprecated. + deprecated_with_suggestion: Marks a function as deprecated and suggests an alternative. + deprecated_with_routing: Marks a function as deprecated and routes calls to an alternative function. +""" + from functools import wraps from typing import Any, Callable @@ -6,7 +20,16 @@ def deprecated(func: Callable) -> Callable: """ - Decorator to mark a function as deprecated. + Decorator that marks a function as deprecated. + + When the decorated function is called, a warning is logged indicating that the function + is deprecated, and then the original function is executed with the provided arguments. + + Args: + func (Callable): The function to be marked as deprecated. + + Returns: + Callable: A wrapped version of the original function that logs a deprecation warning. """ @wraps(func) @@ -19,7 +42,16 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: def deprecated_with_suggestion(alternative_func: Callable) -> Callable: """ - Decorator to provide an alternative function for a deprecated function. + Decorator factory that marks a function as deprecated and suggests an alternative function. + + When the decorated function is called, a warning is logged indicating that the function is deprecated + and suggesting that the alternative function should be used instead. The deprecated function is then executed. + + Args: + alternative_func (Callable): The alternative function that should be used instead. + + Returns: + Callable: A decorator which, when applied to a function, marks it as deprecated with a suggestion. """ def decorator(func: Callable) -> Callable: @@ -37,8 +69,17 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: def deprecated_with_routing(alternative_func: Callable) -> Callable: """ - Decorator to provide an alternative function for a deprecated function. The alternative function will be called - instead of the deprecated function. + Decorator factory that marks a function as deprecated and automatically routes calls to an alternative function. + + When the decorated function is called, a warning is logged indicating that the function is deprecated and + that the alternative function will be called automatically instead. The alternative function is then invoked + with the provided arguments. + + Args: + alternative_func (Callable): The function that will be called instead of the deprecated function. + + Returns: + Callable: A decorator which, when applied to a function, replaces its behavior with that of the alternative. """ def decorator(func: Callable) -> Callable: diff --git a/opensourceleg/logging/exceptions.py b/opensourceleg/logging/exceptions.py index f196b04..4aee45b 100644 --- a/opensourceleg/logging/exceptions.py +++ b/opensourceleg/logging/exceptions.py @@ -1,39 +1,58 @@ class ActuatorStreamException(Exception): - """Actuator Stream Exception + """Exception raised when an actuator is not streaming. - Attributes - ---------- - message (str): Error message + This exception indicates that a command was attempted on an actuator that is not + currently streaming data. + Attributes: + message (str): Explanation of the error. """ def __init__(self, tag: str) -> None: + """Initialize the ActuatorStreamException. + + Args: + tag (str): The identifier or tag of the actuator. + """ super().__init__(f"{tag} is not streaming, please call start() method before sending commands") class ActuatorConnectionException(Exception): - """Actuator Connection Exception + """Exception raised when an actuator is not connected. - Attributes - ---------- - message (str): Error message + This exception is used to indicate that a connection to the actuator could not be + established. + Attributes: + message (str): Explanation of the error. """ def __init__(self, tag: str) -> None: + """Initialize the ActuatorConnectionException. + + Args: + tag (str): The identifier or tag of the actuator. + """ super().__init__(f"{tag} is not connected") class ActuatorIsNoneException(Exception): - """Actuator Connection Exception + """Exception raised when an actuator instance is None in a given mode. - Attributes - ---------- - message (str): Error message + This exception is raised when an actuator instance is expected but found to be None, + indicating that the actuator instance should be passed during initialization or set via + a designated method. + Attributes: + message (str): Explanation of the error. """ def __init__(self, mode: str) -> None: + """Initialize the ActuatorIsNoneException. + + Args: + mode (str): The mode during which the actuator instance was found to be None. + """ super().__init__( f"Actuator is None in {mode} mode, please pass the actuator instance to the mode during " "initialization or set the actuator instance using set_actuator method." @@ -41,53 +60,64 @@ def __init__(self, mode: str) -> None: class ControlModeException(Exception): - """Control Mode Exception + """Exception raised when an attribute cannot be set due to an incorrect control mode. - Attributes - ---------- MOTOR_COUNT_PER_REV: float = 16384 - NM_PER_AMP: float = 0.1133 - IMPEDANCE_A: float = 0.00028444 - IMPEDANCE_C: float = 0.0007812 - MAX_CASE_TEMPERATURE: float = 80 - M_PER_SEC_SQUARED_ACCLSB: float = 9.80665 / 8192 - message (str): Error message + This exception indicates that a certain attribute cannot be configured because the actuator + is not in the appropriate control mode. + Attributes: + message (str): Explanation of the error. """ - def __init__( - self, - tag: str, - attribute: str, - mode: str, - ) -> None: + def __init__(self, tag: str, attribute: str, mode: str) -> None: + """Initialize the ControlModeException. + + Args: + tag (str): The identifier or tag of the actuator. + attribute (str): The attribute that could not be set. + mode (str): The control mode in which the attribute cannot be set. + """ super().__init__( f"[{tag}] Cannot set {attribute} in {mode} mode. Please set the actuator to {attribute} mode first." ) class VoltageModeMissingException(Exception): - """Voltage Mode Missing Exception + """Exception raised when an actuator is missing a voltage mode. - Attributes - ---------- - message (str): Error message + This exception is used to indicate that an actuator does not have a voltage mode + configured when one is required. + Attributes: + message (str): Explanation of the error. """ def __init__(self, tag: str) -> None: + """Initialize the VoltageModeMissingException. + + Args: + tag (str): The identifier or tag of the actuator. + """ super().__init__(f"{tag} must have a voltage mode") class ActuatorKeyException(Exception): - """Actuator Key Error + """Exception raised when a required key is missing from the actuator's dictionary. - Attributes - ---------- - message (str): Error message + This exception indicates that a specific key was not found in the actuator dictionary, + which is necessary for proper operation. + Attributes: + message (str): Explanation of the error. """ def __init__(self, tag: str, key: str) -> None: + """Initialize the ActuatorKeyException. + + Args: + tag (str): The identifier or tag of the actuator. + key (str): The missing key in the actuator dictionary. + """ super().__init__( f"{tag} does not have {key} key in the actuators dictionary. " f"Please check the actuators dictionary for the `{key}` key." diff --git a/opensourceleg/logging/logger.py b/opensourceleg/logging/logger.py index 831a6fa..fb925d5 100644 --- a/opensourceleg/logging/logger.py +++ b/opensourceleg/logging/logger.py @@ -10,8 +10,8 @@ - `LogLevel`: Enum class that defines the log levels supported by the `Logger` class. - `Logger`: Logs attributes of class instances to a CSV file. It supports -setting different logging levels for file and stream handlers. -- `LOGGER`: Global instance of the `Logger` class that can be used throughout + setting different logging levels for file and stream handlers. +- `LOGGER`: Global instance of the `Logger` class that can be used throughout. Usage Guide: @@ -21,7 +21,6 @@ 3. Add class instances and attributes to log using the `track_variable` method. 4. Start logging data using the `update` method. 5. PLEASE call the `close` method before exiting the program to ensure all data is written to the log file. - """ import csv @@ -38,20 +37,14 @@ class LogLevel(Enum): """ - Enumerates the possible log levels. + Enum for log levels used by the Logger class. Attributes: - DEBUG (int): Debug log level - INFO (int): Info log level - WARNING (int): Warning log level - ERROR (int): Error log level - CRITICAL (int): Critical log level - - Examples: - >>> LogLevel.DEBUG - 10 - >>> LogLevel.INFO - 20 + DEBUG: Detailed information, typically of interest only when diagnosing problems. + INFO: Confirmation that things are working as expected. + WARNING: An indication that something unexpected happened. + ERROR: A more serious problem, the software has not been able to perform some function. + CRITICAL: A serious error, indicating that the program itself may be unable to continue running. """ DEBUG = logging.DEBUG @@ -108,12 +101,17 @@ class Logger(logging.Logger): >>> logger.track_variable(lambda: 42, "answer") >>> logger.update() >>> logger.flush_buffer() - """ _instance = None - def __new__(cls, *args, **kwargs): + def __new__(cls, *args: Any, **kwargs: Any) -> "Logger": + """ + Ensure that only one instance of Logger is created (singleton pattern). + + Returns: + Logger: The singleton Logger instance. + """ if cls._instance is None: cls._instance = super().__new__(cls) else: @@ -132,17 +130,19 @@ def __init__( buffer_size: int = 1000, ) -> None: """ - Initialize the custom logger with the specified configuration. + Initialize the Logger instance. + + Sets up logging paths, format, handler levels, and internal buffers for tracking variables. Args: - log_path: The path to save log files. - log_format: The log message format. - file_level: The log level for file output. - stream_level: The log level for console output. - file_max_bytes: The maximum size of the log file in bytes before rotation. - file_backup_count: The number of backup log files to keep. - file_name: The base name for the log file. - buffer_size: The maximum number of log entries to buffer before writing to the CSV file. + log_path (str): Directory path where log files will be stored. + log_format (str): Format string for log messages. + file_level (LogLevel): Logging level for file handler. + stream_level (LogLevel): Logging level for stream (console) handler. + file_max_bytes (int): Maximum size (in bytes) for log file rotation. + file_backup_count (int): Number of backup log files to keep. + file_name (Union[str, None]): Optional user-specified file name prefix. + buffer_size (int): Maximum number of log records to buffer before writing to CSV. """ if not hasattr(self, "_initialized"): super().__init__(__name__) @@ -180,10 +180,14 @@ def __init__( self._log_path = log_path def _setup_logging(self) -> None: + """ + Set up the stream logging handler. + + Configures the logger level, formatter, and attaches a stream handler for console output. + """ if not hasattr(self, "_stream_handler"): # Prevent duplicate handlers self.setLevel(level=self._file_level.value) self._std_formatter = logging.Formatter(self._log_format) - self._stream_handler = logging.StreamHandler() self._stream_handler.setLevel(level=self._stream_level.value) self._stream_handler.setFormatter(fmt=self._std_formatter) @@ -204,7 +208,7 @@ def _setup_file_handler(self) -> None: self._file_handler.setFormatter(fmt=self._std_formatter) self.addHandler(hdlr=self._file_handler) - def _ensure_file_handler(self): + def _ensure_file_handler(self) -> None: if not hasattr(self, "_file_handler"): self._setup_file_handler() @@ -225,7 +229,6 @@ def track_variable(self, var_func: Callable[[], Any], name: str) -> None: >>> LOGGER.update() >>> LOGGER.flush_buffer() """ - var_id = id(var_func) self._tracked_vars[var_id] = var_func self._var_names[var_id] = name @@ -252,6 +255,12 @@ def untrack_variable(self, var_func: Callable[[], Any]) -> None: self._var_names.pop(var_id, None) def __repr__(self) -> str: + """ + Return a string representation of the Logger instance. + + Returns: + str: A string representation including the current file path. + """ return f"Logger(file_path={self._file_path})" def set_file_name(self, file_name: Union[str, None]) -> None: @@ -271,8 +280,8 @@ def set_file_name(self, file_name: Union[str, None]) -> None: file_name = file_name.split(".")[0] self._user_file_name = file_name - self._file_path = "" - self._csv_path = "" + self._file_path = os.path.join(self._log_path, f"{file_name}.log") + self._csv_path = os.path.join(self._log_path, f"{file_name}.csv") def set_file_level(self, level: LogLevel) -> None: """ @@ -362,7 +371,10 @@ def update(self) -> None: def flush_buffer(self) -> None: """ - Write the buffered log entries to the CSV file. + Flush the buffered log data to the CSV file. + + Ensures that the file handler is available, writes the header if not yet written, + writes all buffered rows to the CSV, clears the buffer, and flushes the file. """ if not self._buffer: return @@ -371,22 +383,31 @@ def flush_buffer(self) -> None: if self._file is None: self._file = open(self._csv_path, "w", newline="") - self._writer = csv.writer(self._file) + self._writer = csv.writer(self._file) # type: ignore[assignment] if not self._header_written: self._write_header() - self._writer.writerows(self._buffer) + self._writer.writerows(self._buffer) # type: ignore[attr-defined] self._buffer.clear() self._file.flush() def _write_header(self) -> None: + """ + Write the CSV header based on tracked variable names. + This header is written only once per log file. + """ header = list(self._var_names.values()) - - self._writer.writerow(header) # type: ignore[assignment] + self._writer.writerow(header) # type: ignore[attr-defined] self._header_written = True def _generate_file_paths(self) -> None: + """ + Generate file paths for the log and CSV files based on the current settings. + + Creates the log directory if it does not exist, and uses the current timestamp + (and optionally a user-specified name) to generate unique file names. + """ now = datetime.now() timestamp = now.strftime("%Y%m%d_%H%M%S") script_name = os.path.basename(__file__).split(".")[0] @@ -398,16 +419,35 @@ def _generate_file_paths(self) -> None: self._csv_path = file_path + ".csv" def __enter__(self) -> "Logger": + """ + Enter the runtime context related to this Logger instance. + + Returns: + Logger: The current Logger instance. + """ return self - def __exit__(self, exc_type, exc_val, exc_tb) -> None: + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + """ + Exit the runtime context and close the Logger. + + Args: + exc_type (Any): The exception type if an exception occurred. + exc_val (Any): The exception value if an exception occurred. + exc_tb (Any): The traceback if an exception occurred. + """ self.close() def reset(self) -> None: """ - Reset the logger state. + Reset the Logger. + + Closes the current file, reinitializes the logging handlers, clears tracked variables, + and resets header status. """ - self._buffer.clear() + self.close() + self._setup_logging() + self._tracked_vars.clear() self._var_names.clear() self._header_written = False @@ -417,6 +457,9 @@ def reset(self) -> None: def close(self) -> None: """ + Flush any buffered log data and close the CSV file. + + This method should be called before the program exits to ensure all data is written. Close the logger and flush any remaining log entries. Examples: @@ -429,64 +472,158 @@ def close(self) -> None: self._file = None self._writer = None - def debug(self, msg, *args, **kwargs): + def debug(self, msg: object, *args: object, **kwargs: Any) -> None: + """ + Log a debug message. + + Ensures that the file handler is set up before logging. + + Args: + msg (object): The message to log. + *args (object): Additional arguments. + **kwargs (Any): Additional keyword arguments. + """ self._ensure_file_handler() super().debug(msg, *args, **kwargs) - def info(self, msg, *args, **kwargs): + def info(self, msg: object, *args: object, **kwargs: Any) -> None: + """ + Log an info message. + + Ensures that the file handler is set up before logging. + + Args: + msg (object): The message to log. + *args (object): Additional arguments. + **kwargs (Any): Additional keyword arguments. + """ self._ensure_file_handler() super().info(msg, *args, **kwargs) - def warning(self, msg, *args, **kwargs): + def warning(self, msg: object, *args: object, **kwargs: Any) -> None: + """ + Log a warning message. + + Ensures that the file handler is set up before logging. + + Args: + msg (object): The message to log. + *args (object): Additional arguments. + **kwargs (Any): Additional keyword arguments. + """ self._ensure_file_handler() super().warning(msg, *args, **kwargs) - def error(self, msg, *args, **kwargs): + def error(self, msg: object, *args: object, **kwargs: Any) -> None: + """ + Log an error message. + + Ensures that the file handler is set up before logging. + + Args: + msg (object): The message to log. + *args (object): Additional arguments. + **kwargs (Any): Additional keyword arguments. + """ self._ensure_file_handler() super().error(msg, *args, **kwargs) - def critical(self, msg, *args, **kwargs): + def critical(self, msg: object, *args: object, **kwargs: Any) -> None: + """ + Log a critical message. + + Ensures that the file handler is set up before logging. + + Args: + msg (object): The message to log. + *args (object): Additional arguments. + **kwargs (Any): Additional keyword arguments. + """ self._ensure_file_handler() super().critical(msg, *args, **kwargs) - def log(self, level, msg, *args, **kwargs): + def log(self, level: int, msg: object, *args: object, **kwargs: Any) -> None: + """ + Log a message with a specific log level. + + Ensures that the file handler is set up before logging. + + Args: + level (int): The log level. + msg (object): The message to log. + *args (object): Additional arguments. + **kwargs (Any): Additional keyword arguments. + """ self._ensure_file_handler() super().log(level, msg, *args, **kwargs) @property - def file_path(self) -> str: + def file_path(self) -> Optional[str]: """ - Get the path to the log file. + Get the current file path for the log file. + + Returns: + Optional[str]: The file path as a string, or None if not set. """ - if self._file_path == "": - self._generate_file_paths() return self._file_path + @property + def csv_path(self) -> Optional[str]: + """ + Get the current file path for the CSV file. + + Returns: + Optional[str]: The CSV file path as a string, or None if not set. + """ + return self._csv_path + + @property + def log_path(self) -> str: + """ + Get the log directory path. + + Returns: + str: The directory path where log files are stored. + """ + return self._log_path + @property def buffer_size(self) -> int: """ - Get the maximum number of log entries to buffer before writing to the CSV file. + Get the current buffer size. + + Returns: + int: The maximum number of log records held in the buffer. """ return self._buffer_size @property def file_level(self) -> LogLevel: """ - Get the log level for file output (.log). + Get the current file logging level. + + Returns: + LogLevel: The logging level for the file handler. """ return self._file_level @property def stream_level(self) -> LogLevel: """ - Get the log level for console output. + Get the current stream (console) logging level. + + Returns: + LogLevel: The logging level for the stream handler. """ return self._stream_level @property def file_max_bytes(self) -> int: """ - Get the maximum size of the log file in bytes before rotation. + Get the maximum number of bytes for the log file before rotation. + + Returns: + int: The maximum file size in bytes. """ return self._file_max_bytes @@ -494,6 +631,9 @@ def file_max_bytes(self) -> int: def file_backup_count(self) -> int: """ Get the number of backup log files to keep. + + Returns: + int: The backup count. """ return self._file_backup_count diff --git a/opensourceleg/math/math.py b/opensourceleg/math/math.py index ad684b0..f497ae0 100644 --- a/opensourceleg/math/math.py +++ b/opensourceleg/math/math.py @@ -17,16 +17,16 @@ class ThermalModel: 1: C_w * dT_w/dt = (I^2)R + (T_c-T_w)/R_WC 2: C_c * dT_c/dt = (T_w-T_c)/R_WC + (T_w-T_a)/R_CA - where: - C_w: Thermal capacitance of the winding - C_c: Thermal capacitance of the case - R_WC: Thermal resistance between the winding and the case - R_CA: Thermal resistance between the case and the ambient - T_w: Temperature of the winding - T_c: Temperature of the case - T_a: Temperature of the ambient - I: Current - R: Resistance + where: + C_w: Thermal capacitance of the winding + C_c: Thermal capacitance of the case + R_WC: Thermal resistance between the winding and the case + R_CA: Thermal resistance between the case and the ambient + T_w: Temperature of the winding + T_c: Temperature of the case + T_a: Temperature of the ambient + I: Current + R: Resistance Implementation: 1: The model is updated at every time step with the current and the ambient temperature. diff --git a/opensourceleg/robots/base.py b/opensourceleg/robots/base.py index ff68ebc..69a94a0 100644 --- a/opensourceleg/robots/base.py +++ b/opensourceleg/robots/base.py @@ -1,3 +1,15 @@ +""" +Module for the RobotBase abstract class. + +This module defines an abstract base class, `RobotBase`, that provides a template for a robot +system integrating actuators and sensors. The class supports context management and defines +abstract methods for starting, stopping, and updating the robot's components. + +Type Parameters: + TActuator: A type that must be a subclass of ActuatorBase. + TSensor: A type that must be a subclass of SensorBase. +""" + from abc import ABC, abstractmethod from typing import Any, Generic, TypeVar @@ -10,25 +22,80 @@ class RobotBase(ABC, Generic[TActuator, TSensor]): + """ + Abstract base class representing a robot composed of actuators and sensors. + + This class provides the basic structure for a robot, including methods to start, stop, + and update its components. It also supports context management so that the robot + can be used within a with-statement to automatically start and stop its components. + + Attributes: + actuators (dict[str, TActuator]): A dictionary mapping actuator names to actuator instances. + sensors (dict[str, TSensor]): A dictionary mapping sensor names to sensor instances. + """ + def __init__( self, tag: str, actuators: dict[str, TActuator], sensors: dict[str, TSensor], ) -> None: + """ + Initialize the RobotBase instance. + + Args: + tag (str): A unique identifier for the robot. + actuators (dict[str, TActuator]): A dictionary of actuators keyed by their names. + sensors (dict[str, TSensor]): A dictionary of sensors keyed by their names. + """ self._tag = tag self.actuators: dict[str, TActuator] = actuators self.sensors: dict[str, TSensor] = sensors def __enter__(self) -> "RobotBase": + """ + Enter the runtime context for the robot. + + This method starts all actuators and sensors and returns the robot instance. + + Returns: + RobotBase: The current robot instance. + + Example: + >>> with MyRobot() as robot: + ... robot.update() + """ self.start() return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + """ + Exit the runtime context for the robot. + + This method stops all actuators and sensors. + + Args: + exc_type (Any): The exception type, if an exception occurred. + exc_val (Any): The exception value, if an exception occurred. + exc_tb (Any): The traceback, if an exception occurred. + """ self.stop() @abstractmethod def start(self) -> None: + """ + Start all actuators and sensors. + + For each actuator in the actuators dictionary, a debug message is logged and its start method is called. + Similarly, for each sensor in the sensors dictionary, a debug message is logged and its start method is called. + + Returns: + None + + Example: + >>> robot = MyRobot() + >>> robot.start() + """ for actuator in self.actuators.values(): LOGGER.debug(f"Calling start method of {actuator.tag}") actuator.start() @@ -39,6 +106,21 @@ def start(self) -> None: @abstractmethod def stop(self) -> None: + """ + Stop all actuators and sensors. + + For each actuator in the actuators dictionary, a debug message is logged and its stop method is called. + Similarly, for each sensor in the sensors dictionary, a debug message is logged and its stop method is called. + + Returns: + None + + Example: + >>> robot = MyRobot() + >>> robot.start() + ... # Do something with the robot + >>> robot.stop() + """ for actuator in self.actuators.values(): LOGGER.debug(f"Calling stop method of {actuator.tag}") actuator.stop() @@ -49,6 +131,19 @@ def stop(self) -> None: @abstractmethod def update(self) -> None: + """ + Update all actuators and sensors. + + This method calls the update method for each actuator and sensor to refresh their state. + + Returns: + None + + Example: + >>> robot = MyRobot() + >>> robot.start() + >>> robot.update() + """ for actuator in self.actuators.values(): actuator.update() @@ -57,10 +152,20 @@ def update(self) -> None: @property def tag(self) -> str: + """ + Get the unique identifier (tag) of the robot. + + Returns: + str: The robot's tag. + + Example: + >>> robot = MyRobot() + >>> robot.tag + "my_robot" + """ return self._tag - # You can pipe values from the actuators to custom @property methods - # to get the values from the actuators with dot notation + # Example of piping values from the actuators to custom @property methods: # @property # def knee(self) -> TActuator: # return self.actuators["knee"] diff --git a/opensourceleg/safety/safety.py b/opensourceleg/safety/safety.py index 11d3b9f..d7dca32 100644 --- a/opensourceleg/safety/safety.py +++ b/opensourceleg/safety/safety.py @@ -30,6 +30,10 @@ def is_changing( Returns: Callable: Decorator function. + + Raises: + ValueError: If the property is not changing and no proxy attribute is provided. + """ history_key = f"_{attribute_name}_history" @@ -71,6 +75,9 @@ def is_negative(clamp: bool = False) -> Callable: Returns: Callable: Decorator function. + + Raises: + ValueError: If the property's value is not negative. """ def decorator(func: Callable) -> Callable: @@ -96,6 +103,9 @@ def is_positive(clamp: bool = False) -> Callable: Returns: Callable: Decorator function. + + Raises: + ValueError: If the property's value is not positive. """ def decorator(func: Callable) -> Callable: @@ -121,6 +131,9 @@ def is_zero(clamp: bool = False) -> Callable: Returns: Callable: Decorator function. + + Raises: + ValueError: If the property's value is not zero. """ def decorator(func: Callable) -> Callable: @@ -182,6 +195,9 @@ def is_greater_than(min_value: float, clamp: bool = False, equality: bool = Fals Returns: Callable: Decorator function. + + Raises: + ValueError: If the property's value is less than or equal to the minimum value """ def decorator(func: Callable) -> Callable: @@ -216,6 +232,9 @@ def is_less_than(max_value: float, clamp: bool = False, equality: bool = False) Returns: Callable: Decorator function. + + Raises: + ValueError: If the property's value is greater than or equal to the maximum value """ def decorator(func: Callable) -> Callable: @@ -248,6 +267,9 @@ def custom_criteria(criteria: Callable) -> Callable: Returns: Callable: Decorator function. + + Raises: + ValueError: If the property's value does not meet the custom criteria. """ def decorator(func: Callable) -> Callable: @@ -266,6 +288,16 @@ def wrapper(instance: object, *args: Any, **kwargs: Any) -> Any: class SafetyDecorators: """ Dataclass that contains all safety decorators. + + Attributes: + is_changing: Decorator to check if a property's value is changing. + is_negative: Decorator to check if a property's value is negative. + is_positive: Decorator to check if a property's value is positive. + is_zero: Decorator to check if a property's value is zero. + is_within_range: Decorator to check if a property's value is within a given range. + is_greater_than: Decorator to check if a property's value is greater than a given value. + is_less_than: Decorator to check if a property's value is less than a given value. + custom_criteria: Decorator to check if a property's value meets a custom criteria. """ is_changing = is_changing @@ -299,6 +331,10 @@ def add_safety(self, instance: object, attribute: str, decorator: Callable) -> N instance (object): Object that contains the attribute. attribute (str): Name of the attribute. decorator (Callable): Safety decorator to be applied to the attribute. + + Raises: + AttributeError: If the attribute does not exist in the given object. + Warning: If the attribute is not a property. The SafetyManager only works on properties. """ if not hasattr(instance, attribute): @@ -323,6 +359,12 @@ def add_safety(self, instance: object, attribute: str, decorator: Callable) -> N def start(self) -> None: """ Applies all decorators to the properties of the objects in the safe_objects dictionary. + + Example: + >>> safety_manager = SafetyManager() + >>> safety_manager.add_safety(sensor, "value", SafetyDecorators.is_changing("value")) + >>> safety_manager.add_safety(sensor, "a", SafetyDecorators.is_positive()) + >>> safety_manager.start() """ for container, safe_attributes in self.safe_objects.items(): container_subclass = type(f"{container.__class__.__name__}:SAFE", (container.__class__,), {}) @@ -343,6 +385,9 @@ def start(self) -> None: def update(self) -> None: """ Accesses the properties of the objects in the safe_objects dictionary, thereby triggering the decorators. + + Example: + TODO: Add example """ for container, safe_attributes in self.safe_objects.items(): for attribute_name, _ in safe_attributes.items(): diff --git a/opensourceleg/sensors/adc.py b/opensourceleg/sensors/adc.py index 37172ca..a357a2f 100644 --- a/opensourceleg/sensors/adc.py +++ b/opensourceleg/sensors/adc.py @@ -1,3 +1,11 @@ +""" +Module for interfacing with the ADS131M0x family of ADC chips. + +This module provides a class for communicating with and configuring the ADS131M0x +ADC chips. It supports SPI communication to reset, configure, calibrate, and read data +from the ADC in units of millivolts. +""" + import math from time import sleep from typing import Any, ClassVar, Optional @@ -8,10 +16,39 @@ class ADS131M0x(ADCBase): - """Class used for communication with the ADS131M0x family of ADC chips. + """ + Class used for communication with the ADS131M0x family of ADC chips. This class allows you to configure the ADS131M0x family of chips as well as - read out the ADC values in units of mV. + read out the ADC values in units of millivolts. + + Class Attributes: + _MAX_CHANNELS (int): Maximum number of channels supported. + _BYTES_PER_WORD (int): Number of bytes per word in SPI communication. + _RESOLUTION (int): ADC resolution in bits. + _SPI_MODE (int): SPI mode used for communication. + _BLANK_WORD (ClassVar[list[int]]): Blank word for SPI messages. + _RESET_WORD (ClassVar[list[int]]): SPI command for resetting the ADC. + _STANDBY_WORD (ClassVar[list[int]]): SPI command for putting the ADC in standby. + _WAKEUP_WORD (ClassVar[list[int]]): SPI command for waking up the ADC. + _RREG_PREFIX (int): Prefix for read register commands. + _WREG_PREFIX (int): Prefix for write register commands. + _ID_REG (int): Address of the ID register. + _STATUS_REG (int): Address of the status register. + _MODE_REG (int): Address of the mode register. + _CLOCK_REG (int): Address of the clock register. + _GAIN1_REG (int): Address of the first gain register. + _GAIN2_REG (int): Address of the second gain register. + _CFG_REG (int): Address of the configuration register. + _DISABLE_CHANNELS_CLOCK (int): Value to disable channel clocks. + _ENABLE_CHANNELS_CLOCK (int): Value to enable channel clocks. + _MODE_CFG (int): Mode configuration value. + _OCAL_MSB_ADDRS (ClassVar[list[int]]): List of MSB addresses for offset calibration. + _OCAL_LSB_ADDRS (ClassVar[list[int]]): List of LSB addresses for offset calibration. + _GCAL_MSB_ADDRS (ClassVar[list[int]]): List of MSB addresses for gain calibration. + _GCAL_LSB_ADDRS (ClassVar[list[int]]): List of LSB addresses for gain calibration. + _CHANNEL_CFG_ADDRS (ClassVar[list[int]]): List of addresses for channel configuration. + _GCAL_STEP_SIZE (float): Step size used in gain calibration. """ _MAX_CHANNELS = 8 @@ -60,24 +97,28 @@ def __init__( voltage_reference: float = 1.2, gain_error: Optional[list[int]] = None, ): - """Initializes ADS131M0x class. + """ + Initializes the ADS131M0x instance. + + Validates the provided configuration and sets up internal parameters for SPI communication, + channel gains, and calibration. Args: - - spi_bus(int): Which SPI bus the ADC is connected to. Default: 0 - - spi_chip(int): Which CS signal the ADC is connected to. Default: 0 - - num_channels(int): How many channels are present on the ADC. Default: 6 - - max_speed_hz(int): Maximum clock frequency of the SPI communication. Default: 8192000 - - channel_gains(List[int]): Gains of the programmable gain amplifier for all channels. Default: [32,128] * 3 - - voltage_reference(float): Reference voltage used by the ADC. Default: 1.2 - - gain_error(List[int]): User-calculated integers used for correcting the gain of each channel for - additional precision. Default: [] + spi_bus (int): SPI bus number where the ADC is connected. Default is 0. + spi_chip (int): Chip select (CS) signal number for the ADC. Default is 0. + num_channels (int): Number of active channels on the ADC. Default is 6. + max_speed_hz (int): Maximum SPI clock speed in hertz. Default is 8192000. + channel_gains (list[int]): List of gains for each channel's programmable gain amplifier. + Default is [32, 128] repeated 3 times (for 6 channels). + voltage_reference (float): Reference voltage used by the ADC. Default is 1.2 V. + gain_error (Optional[list[int]]): Optional user-calculated gain error corrections per channel. + Default is an empty list. Raises: - ValueError: If length of channel_gains is not equal to number of channels, or if gain is not a power of 2 - between 1 and 128. - + ValueError: If the length of channel_gains does not equal num_channels, + or if gain_error is provided and its length does not equal num_channels, + or if any gain is not a power of 2 between 1 and 128. """ - if gain_error is None: gain_error = [] if len(channel_gains) != num_channels: @@ -90,7 +131,7 @@ def __init__( self._num_channels = num_channels self._max_speed_hz = max_speed_hz self._gains = [0] * num_channels - for i in range(0, num_channels): + for i in range(num_channels): gain = int(math.log2(channel_gains[i])) if gain != math.log2(channel_gains[i]): raise ValueError("Gain must be a power of 2 between 1 and 128") @@ -109,11 +150,21 @@ def __init__( self._data = [0.0] * num_channels def __repr__(self) -> str: + """ + Return a string representation of the ADS131M0x instance. + + Returns: + str: The string "ADS131M0x". + """ return "ADS131M0x" def start(self) -> None: - """Opens SPI port, calibrates ADC, and begins streaming ADC data.""" + """ + Open the SPI port, reset the ADC, configure gain settings, and begin streaming ADC data. + This method initializes the SPI communication, resets the device, sets the channel gains, + transitions the device to continuous conversion mode, and clears any stale data. + """ self._spi.open(self._spi_bus, self._spi_chip) self._spi.max_speed_hz = self._max_speed_hz self._spi.mode = self._SPI_MODE @@ -124,45 +175,66 @@ def start(self) -> None: self._clear_stale_data() def stop(self) -> None: - """Stop streaming ADC data and close SPI port.""" + """ + Stop streaming ADC data and close the SPI port. + + This method transitions the ADC to standby mode and closes the SPI connection. + """ self._set_device_state(0) self._spi.close() def reset(self) -> None: - """Resets all register values.""" + """ + Reset all ADC register values. + + Sends a reset command followed by blank words to initialize the device registers. + """ self._spi.xfer2(self._RESET_WORD + self._BLANK_WORD * (self._words_per_frame - 1)) def update(self) -> None: - """Reads ADC data.""" + """ + Read ADC data from the device. + + Waits until the ADC channels are ready to be read and then updates the internal data + with the latest voltage values in millivolts. + """ while not self._ready_to_read(): sleep(0.001) self._data = self._read_data_millivolts() def calibrate(self) -> None: - """Performs offset and gain calibration.""" + """ + Perform offset and gain calibration on the ADC. + + This method performs an offset calibration (zeroing) and, if gain error corrections + are provided, performs a gain calibration. + """ self._offset_calibration() if self._gain_error is not None: self._gain_calibration() def read_register(self, address: int) -> int: - """Read value at register located at specified address. + """ + Read the value stored in the register at the specified address. Args: - - address(int): Address of the register to be read. + address (int): The address of the register to read. + Returns: - Value stored at register located at address. + int: The value stored at the register. """ msg = (address << 7) | (self._RREG_PREFIX << 13) word = self._message_to_word(msg) rsp = self._spi_message(word) - return (int)(rsp[0] << 8 | rsp[1]) + return int(rsp[0] << 8 | rsp[1]) def write_register(self, address: int, reg_val: int) -> None: - """Writes specific value to register located at designated address. + """ + Write a specific value to the register at the designated address. Args: - - address(int): Address of the register to be written. - - reg_val(int): Value to be written to register at address. + address (int): The address of the register to write. + reg_val (int): The value to be written to the register. """ addr_msg = (address << 7) | (self._WREG_PREFIX << 13) addr_bytes = self._message_to_word(addr_msg) @@ -171,45 +243,67 @@ def write_register(self, address: int, reg_val: int) -> None: @property def is_streaming(self) -> bool: + """ + Check if the ADC is currently streaming data. + + Returns: + bool: True if streaming; otherwise, False. + """ return self._streaming @property def gains(self) -> list[int]: + """ + Get the gain settings (in logarithmic scale) for each ADC channel. + + Returns: + list[int]: A list of gain values for each channel. + """ return self._gains @property def data(self) -> Any: + """ + Get the latest ADC data. + + Returns: + Any: The list of voltage readings (in millivolts) for each channel. + """ return self._data def _spi_message(self, message: list[int]) -> list[int]: - """Send SPI message to ADS131M0x. + """ + Send an SPI message to the ADS131M0x and read the response. Args: - - message(List[int]): message to be sent to the ADS131M0x separated into message. + message (list[int]): The message to be sent, split into individual bytes. + Returns: - The response to the message sent, including the entire frame following the response. + list[int]: The response from the device, representing the entire frame. """ self._spi.xfer2(message) - return (list[int])(self._spi.readbytes(self._BYTES_PER_WORD * self._words_per_frame)) + return list(self._spi.readbytes(self._BYTES_PER_WORD * self._words_per_frame)) def _channel_enable(self, state: bool) -> None: - """Enables or disables streaming on all channels. + """ + Enable or disable streaming on all channels. - Arg: - - state(bool): sets whether or not the ADC is streaming + Args: + state (bool): If True, enables the channel clocks; if False, disables them. """ - if state is True: + if state: self.write_register(self._CLOCK_REG, self._ENABLE_CHANNELS_CLOCK) - elif state is False: + else: self.write_register(self._CLOCK_REG, self._DISABLE_CHANNELS_CLOCK) def _set_device_state(self, state: int) -> None: - """Sets state of internal state machine. + """ + Set the internal state of the ADC device. Args: - state(int): Device state - 0 -- Standby - 1 -- Continuous Conversion Mode + state (int): The desired state: + 0 -- Standby mode. + 1 -- Continuous Conversion Mode. """ if state == 0: self._spi.xfer2(self._STANDBY_WORD + self._BLANK_WORD * (self._words_per_frame - 1)) @@ -219,25 +313,35 @@ def _set_device_state(self, state: int) -> None: self._streaming = True def _set_voltage_source(self, source: int) -> None: - """Changes voltage source for ADC input. + """ + Change the voltage source for the ADC input. + Args: - source(int): Selects which voltage source to use for ADC data. - 0 -- external input - 1 -- shorts differential pairs for value of ~0 - 2 -- positive internal test signal ((160mV / gain) * (Vref / 1.25)) - 3 -- negative internal test signal ((-160mV / gain) * (Vref / 1.25)) + source (int): The voltage source selection: + 0 -- external input. + 1 -- shorts differential pairs for a value near 0. + 2 -- positive internal test signal ((160mV / gain) * (Vref / 1.25)). + 3 -- negative internal test signal ((-160mV / gain) * (Vref / 1.25)). """ - for i in range(0, self._num_channels): + for i in range(self._num_channels): self.write_register(self._CHANNEL_CFG_ADDRS[i], source) def _clear_stale_data(self) -> None: - """Clears stale ADC values so the next read will be accurate.""" + """ + Clear stale ADC values from the device. + + Reads and discards two frames of ADC data to ensure the next read returns current values. + """ for _ in range(2): self._read_data_millivolts() def _set_gain(self) -> None: - """Set PGA gain for each channel of ADC.""" + """ + Set the programmable gain amplifier (PGA) gain for each ADC channel. + This method configures the gain registers for both groups of channels and enables + the channel clocks. + """ gains = self._gains + [0] * (self._MAX_CHANNELS - len(self._gains)) self._channel_enable(False) gains_msg = gains[3] << 12 | gains[2] << 8 | gains[1] << 4 | gains[0] @@ -247,66 +351,104 @@ def _set_gain(self) -> None: self._channel_enable(True) def _offset_calibration(self) -> None: - """Centers the ADC data around the measured zero value.""" + """ + Perform offset calibration of the ADC. + + This method centers the ADC data around the measured zero value by setting + the appropriate offset registers for each channel. + """ self._set_voltage_source(1) self._clear_stale_data() num_data_points = 1000 - offsets = [[0] * self._num_channels] * num_data_points + offsets = [[0] * self._num_channels for _ in range(num_data_points)] for i in range(num_data_points): offsets[i] = self._read_data_counts() offset_avg = [int(sum(values) / len(values)) for values in zip(*offsets)] - for i in range(0, self._num_channels): + for i in range(self._num_channels): self.write_register(self._OCAL_MSB_ADDRS[i], offset_avg[i] >> 8) self.write_register(self._OCAL_LSB_ADDRS[i], (offset_avg[i] << 8) & 0xFF00) self._set_voltage_source(0) def _gain_calibration(self) -> None: - """Corrects actual gain to desired gain using user-calculated gain error for each channel.""" + """ + Perform gain calibration of the ADC. + + Adjusts the actual gain to match the desired gain using user-calculated gain error + values for each channel. + """ for i in range(self._num_channels): gain_correction = (1 + self._gain_error[i]) / self._GCAL_STEP_SIZE - self.write_register(self._GCAL_MSB_ADDRS[i], (int)(gain_correction) >> 8) + self.write_register(self._GCAL_MSB_ADDRS[i], int(gain_correction) >> 8) def _message_to_word(self, msg: int) -> list[int]: - """Separates message into bytes to be sent to ADC.""" + """ + Convert an integer message into a list of bytes (word) for SPI transmission. + + Args: + msg (int): The integer message to convert. + + Returns: + list[int]: A list containing the individual bytes of the message. + """ word = [0] * 3 word[0] = (msg >> 8) & 0xFF word[1] = msg & 0xFF return word def _ready_to_read(self) -> bool: - """Returns true if all ADC channels are ready to be read.""" + """ + Check if all ADC channels are ready for a new data read. + + Returns: + bool: True if the status register indicates readiness; otherwise, False. + """ reply = self.read_register(self._STATUS_REG) return reply == self._ready_status def _read_data_millivolts(self) -> list[float]: - """Returns a List representing the voltage in millivolts for all channels of the ADC""" + """ + Read and convert ADC data into millivolt values for each channel. + + Returns: + list[float]: A list of voltage readings (in mV) for each ADC channel. + """ mV = [ 1000 * ((dat) / (2 ** (self._RESOLUTION - 1)) * self._voltage_reference) for dat in self._read_data_counts() ] return mV def _read_data_counts(self) -> list[int]: - """Returns signed ADC value from -2^23 -> 2^23""" + """ + Read raw ADC counts from the device. + + Reads the signed ADC values (in counts) for each channel. The counts are represented + in two's complement format over a 24-bit resolution. + + Returns: + list[int]: A list of signed ADC counts for each channel. + """ reply = self._spi.readbytes(self._BYTES_PER_WORD * self._words_per_frame) val = [0] * self._num_channels for byte in range(3, self._num_channels * 3 + 1, 3): - val[int(((byte) / 3) - 1)] = self._twos_complement( - ((reply[byte] << 16) | (reply[byte + 1] << 8) | reply[byte + 2]), + index = int(byte / 3) - 1 + val[index] = self._twos_complement( + (reply[byte] << 16) | (reply[byte + 1] << 8) | reply[byte + 2], self._RESOLUTION, ) return val - def _twos_complement( - self, - num: int, - bits: int, - ) -> int: - """Takes in a number and the number of bits used to represent them, then converts the number to twos complement + def _twos_complement(self, num: int, bits: int) -> int: + """ + Convert an unsigned integer to a signed integer using two's complement representation. + Args: - num(int): number to be converted - bits(int): number of bits that represent num + num (int): The unsigned integer. + bits (int): The number of bits used to represent the number. + + Returns: + int: The signed integer value. """ val = num - if (num >> (bits - 1)) != 0: # if sign bit is set e.g. + if (num >> (bits - 1)) != 0: # if sign bit is set val = num - (1 << bits) # compute negative value return val diff --git a/opensourceleg/sensors/base.py b/opensourceleg/sensors/base.py index 57049c7..e2882f2 100644 --- a/opensourceleg/sensors/base.py +++ b/opensourceleg/sensors/base.py @@ -4,7 +4,19 @@ class SensorNotStreamingException(Exception): + """ + Exception raised when an operation is attempted on a sensor that is not streaming. + + This exception indicates that the sensor is not actively streaming data. + """ + def __init__(self, sensor_name: str = "Sensor") -> None: + """ + Initialize the SensorNotStreamingException. + + Args: + sensor_name (str, optional): The name or identifier of the sensor. Defaults to "Sensor". + """ super().__init__( f"{sensor_name} is not streaming, please ensure that the connections are intact, " f"power is on, and the start method is called." @@ -12,6 +24,18 @@ def __init__(self, sensor_name: str = "Sensor") -> None: def check_sensor_stream(func: Callable) -> Callable: + """ + Decorator to ensure that a sensor is streaming before executing the decorated method. + + If the sensor is not streaming, a SensorNotStreamingException is raised. + + Args: + func (Callable): The sensor method to be wrapped. + + Returns: + Callable: The wrapped method that checks streaming status before execution. + """ + @wraps(func) def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: # TODO: This could be a generic type that points to actuator, sensor, etc. @@ -23,160 +47,385 @@ def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: class SensorBase(ABC): + """ + Abstract base class for sensors. + + Defines the common interface for sensors including starting, stopping, + updating, and streaming status. + """ + def __repr__(self) -> str: + """ + Return a string representation of the sensor. + + Returns: + str: A string identifying the sensor class. + """ return "SensorBase" @property @abstractmethod def data(self) -> Any: + """ + Get the sensor data. + + Returns: + Any: The current data from the sensor. + """ pass @abstractmethod def start(self) -> None: + """ + Start the sensor streaming. + + Implementations should handle initializing the sensor and beginning data acquisition. + """ pass @abstractmethod def stop(self) -> None: + """ + Stop the sensor streaming. + + Implementations should handle gracefully shutting down the sensor. + """ pass @abstractmethod def update(self) -> None: + """ + Update the sensor state or data. + + Implementations should refresh or poll the sensor data as needed. + """ pass def __enter__(self) -> "SensorBase": + """ + Enter the runtime context for the sensor. + + This method calls start() and returns the sensor instance. + + Returns: + SensorBase: The sensor instance. + """ self.start() return self def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + """ + Exit the runtime context for the sensor. + + This method calls stop() to shut down the sensor. + + Args: + exc_type (Any): Exception type if raised. + exc_value (Any): Exception value if raised. + traceback (Any): Traceback if an exception occurred. + """ self.stop() @property @abstractmethod def is_streaming(self) -> bool: + """ + Check if the sensor is currently streaming. + + Returns: + bool: True if the sensor is streaming, False otherwise. + """ pass class ADCBase(SensorBase, ABC): + """ + Abstract base class for ADC (Analog-to-Digital Converter) sensors. + + ADC sensors are used to convert analog signals into digital data. + """ + def __init__(self) -> None: + """ + Initialize the ADC sensor. + """ super().__init__() def __repr__(self) -> str: + """ + Return a string representation of the ADC sensor. + + Returns: + str: "ADCBase" + """ return "ADCBase" def reset(self) -> None: + """ + Reset the ADC sensor. + + Implementations should clear any stored state or calibration. + """ pass def calibrate(self) -> None: + """ + Calibrate the ADC sensor. + + Implementations should perform necessary calibration procedures. + """ pass class EncoderBase(SensorBase, ABC): + """ + Abstract base class for encoder sensors. + + Encoders are used to measure position and velocity. + """ + def __init__(self) -> None: + """ + Initialize the encoder sensor. + """ super().__init__() def __repr__(self) -> str: + """ + Return a string representation of the encoder sensor. + + Returns: + str: "EncoderBase" + """ return "EncoderBase" @property @abstractmethod def position(self) -> float: + """ + Get the current encoder position. + + Returns: + float: The current position value. + """ pass @property @abstractmethod def velocity(self) -> float: + """ + Get the current encoder velocity. + + Returns: + float: The current velocity value. + """ pass class LoadcellBase(SensorBase, ABC): + """ + Abstract base class for load cell sensors. + + Load cells are used to measure forces and moments. + """ + def __init__(self) -> None: + """ + Initialize the load cell sensor. + """ pass def __repr__(self) -> str: + """ + Return a string representation of the load cell sensor. + + Returns: + str: "LoadcellBase" + """ return "LoadcellBase" @abstractmethod def calibrate(self) -> None: + """ + Calibrate the load cell sensor. + + Implementations should perform the calibration procedure to ensure accurate readings. + """ pass @abstractmethod def reset(self) -> None: + """ + Reset the load cell sensor. + + Implementations should reset the sensor state and any calibration data. + """ pass @property @abstractmethod def fx(self) -> float: + """ + Get the force along the x-axis. + + Returns: + float: The force measured along the x-axis. + """ pass @property @abstractmethod def fy(self) -> float: + """ + Get the force along the y-axis. + + Returns: + float: The force measured along the y-axis. + """ pass @property @abstractmethod def fz(self) -> float: + """ + Get the force along the z-axis. + + Returns: + float: The force measured along the z-axis. + """ pass @property @abstractmethod def mx(self) -> float: + """ + Get the moment about the x-axis. + + Returns: + float: The moment measured about the x-axis. + """ pass @property @abstractmethod def my(self) -> float: + """ + Get the moment about the y-axis. + + Returns: + float: The moment measured about the y-axis. + """ pass @property @abstractmethod def mz(self) -> float: + """ + Get the moment about the z-axis. + + Returns: + float: The moment measured about the z-axis. + """ pass @property @abstractmethod def is_calibrated(self) -> bool: + """ + Check if the load cell sensor is calibrated. + + Returns: + bool: True if calibrated, False otherwise. + """ pass class IMUBase(SensorBase, ABC): + """ + Abstract base class for Inertial Measurement Unit (IMU) sensors. + + IMUs typically provide acceleration and gyroscopic data. + """ + def __init__(self) -> None: + """ + Initialize the IMU sensor. + """ pass def __repr__(self) -> str: + """ + Return a string representation of the IMU sensor. + + Returns: + str: "IMU" + """ return "IMU" @property @abstractmethod def acc_x(self) -> float: """ - Returns estimated linear acceleration along the x-axis (m/s^2). + Get the estimated linear acceleration along the x-axis. + + Returns: + float: Acceleration in m/s^2 along the x-axis. """ pass @property @abstractmethod def acc_y(self) -> float: + """ + Get the estimated linear acceleration along the y-axis. + + Returns: + float: Acceleration in m/s^2 along the y-axis. + """ pass @property @abstractmethod def acc_z(self) -> float: + """ + Get the estimated linear acceleration along the z-axis. + + Returns: + float: Acceleration in m/s^2 along the z-axis. + """ pass @property @abstractmethod def gyro_x(self) -> float: + """ + Get the gyroscopic measurement along the x-axis. + + Returns: + float: Angular velocity in rad/s along the x-axis. + """ pass @property @abstractmethod def gyro_y(self) -> float: + """ + Get the gyroscopic measurement along the y-axis. + + Returns: + float: Angular velocity in rad/s along the y-axis. + """ pass @property @abstractmethod def gyro_z(self) -> float: + """ + Get the gyroscopic measurement along the z-axis. + + Returns: + float: Angular velocity in rad/s along the z-axis. + """ pass diff --git a/opensourceleg/sensors/imu.py b/opensourceleg/sensors/imu.py index 989c6a0..06072d5 100644 --- a/opensourceleg/sensors/imu.py +++ b/opensourceleg/sensors/imu.py @@ -1,8 +1,24 @@ +""" +Module for interfacing with IMU sensors using the MSCL and Adafruit libraries. + +This module provides two IMU sensor implementations: + - LordMicrostrainIMU: Uses the MSCL library to interface with a Lord Microstrain IMU. + - BNO055: Uses the Adafruit BNO055 library to interface with a Bosch BNO055 IMU. + +Dependencies: + - MSCL (for LordMicrostrainIMU): https://github.com/LORD-MicroStrain/MSCL/tree/master + - adafruit_bno055, board, busio (for BNO055) + +Ensure that the required libraries are installed and that the library paths are added +to PYTHONPATH or sys.path if necessary. +""" + from typing import Any, Union from opensourceleg.logging import LOGGER from opensourceleg.sensors.base import IMUBase, check_sensor_stream +# Attempt to import the MSCL library and add its path. try: import sys @@ -10,21 +26,24 @@ import mscl except ImportError: print( - "Failed to import mscl. Please install the MSCL library from Lord Microstrain and append the path" - "to the PYTHONPATH or sys.path. Checkout https://github.com/LORD-MicroStrain/MSCL/tree/master" + "Failed to import mscl. Please install the MSCL library from Lord Microstrain and append the path " + "to the PYTHONPATH or sys.path. Checkout https://github.com/LORD-MicroStrain/MSCL/tree/master " "and https://lord-microstrain.github.io/MSCL/Documentation/MSCL%20API%20Documentation/index.html" ) +# Attempt to import the Adafruit BNO055 library. try: import adafruit_bno055 except ImportError: print("Failed to import adafruit_bno055") +# Attempt to import board module. try: import board except ImportError: print("Failed to import board") +# Attempt to import busio module. try: import busio except ImportError: @@ -34,14 +53,16 @@ class LordMicrostrainIMU(IMUBase): """ Sensor class for the Lord Microstrain IMU. - Requires the MSCL library from Lord Microstrain (see below for install instructions). - - As configured, this class returns euler angles (rad), angular rates (rad/s), and accelerations (g). + This class interfaces with a Lord Microstrain Inertial Measurement Unit (IMU) + via the MSCL library. It returns Euler angles (in radians), angular rates (in rad/s), + and linear accelerations (in m/s^2). Resources: - * To install, download the pre-built package for raspian at https://github.com/LORD-MicroStrain/MSCL/tree/master - * Full documentation for their library can be found at https://lord-microstrain.github.io/MSCL/Documentation/MSCL%20API%20Documentation/index.html. + - Download MSCL pre-built package for Raspbian: + https://github.com/LORD-MicroStrain/MSCL/tree/master + - MSCL API Documentation: + https://lord-microstrain.github.io/MSCL/Documentation/MSCL%20API%20Documentation/index.html """ def __init__( @@ -50,6 +71,14 @@ def __init__( baud_rate: int = 921600, frequency: int = 200, ) -> None: + """ + Initialize the LordMicrostrainIMU sensor. + + Args: + port (str, optional): Serial port for the IMU. Defaults to "/dev/ttyUSB0". + baud_rate (int, optional): Baud rate for the serial connection. Defaults to 921600. + frequency (int, optional): Data streaming frequency in Hz. Defaults to 200. + """ self._port = port self._baud_rate = baud_rate self._frequency = frequency @@ -58,6 +87,18 @@ def __init__( self._data: dict[str, float] = {} def _configure_mip_channels(self) -> Any: + """ + Configure and return the MIP channels for data streaming. + + Sets up the desired channels for: + - Estimated orientation (Euler angles) + - Estimated angular rate + - Estimated linear acceleration + - GPS timestamp + + Returns: + Any: A configured MipChannels object for the MSCL InertialNode. + """ channels = mscl.MipChannels() channels.append( mscl.MipChannel( @@ -87,6 +128,12 @@ def _configure_mip_channels(self) -> Any: return channels def start(self) -> None: + """ + Start the Lord Microstrain IMU sensor. + + Establishes a serial connection, configures the MIP channels, enables data streaming, + and sets the streaming flag to True. + """ self._connection = mscl.Connection.Serial(self.port, self.baud_rate) self._node = mscl.InertialNode(self._connection) self._node.setActiveChannelFields(mscl.MipTypes.CLASS_ESTFILTER, self._configure_mip_channels()) @@ -95,11 +142,27 @@ def start(self) -> None: @check_sensor_stream def stop(self) -> None: + """ + Stop the Lord Microstrain IMU sensor. + + Sets the node to idle mode and updates the streaming flag to False. + + Raises: + SensorNotStreamingException: If the sensor is not currently streaming. + """ self._node.setToIdle() self._is_streaming = False @check_sensor_stream def ping(self) -> None: + """ + Ping the Lord Microstrain IMU sensor to verify connectivity. + + Logs an info message if the ping is successful, otherwise logs an error. + + Raises: + SensorNotStreamingException: If the sensor is not currently streaming. + """ response = self._node.ping() if response.success(): @@ -109,7 +172,15 @@ def ping(self) -> None: def update(self, timeout: int = 500, max_packets: int = 1, return_packets: bool = False) -> Union[None, Any]: """ - Get IMU data from the Lord Microstrain IMU + Retrieve and update IMU data from the sensor. + + Args: + timeout (int, optional): Timeout for data packet retrieval in milliseconds. Defaults to 500. + max_packets (int, optional): Maximum number of data packets to retrieve. Defaults to 1. + return_packets (bool, optional): If True, returns the raw data packets. Defaults to False. + + Returns: + Union[None, Any]: Returns the data packets if `return_packets` is True; otherwise, None. """ data_packets = self._node.getDataPackets(timeout=timeout, maxPackets=max_packets) data_points = data_packets[-1].data() @@ -121,147 +192,298 @@ def update(self, timeout: int = 500, max_packets: int = 1, return_packets: bool return None def __repr__(self) -> str: + """ + Return a string representation of the LordMicrostrainIMU sensor. + + Returns: + str: "IMULordMicrostrain" + """ return "IMULordMicrostrain" @property def port(self) -> str: + """ + Get the serial port used by the sensor. + + Returns: + str: The serial port. + """ return self._port @property def baud_rate(self) -> int: + """ + Get the baud rate used for the sensor connection. + + Returns: + int: The baud rate. + """ return self._baud_rate @property def frequency(self) -> int: + """ + Get the data streaming frequency of the sensor. + + Returns: + int: The streaming frequency in Hz. + """ return self._frequency @property def is_streaming(self) -> bool: + """ + Check if the sensor is currently streaming data. + + Returns: + bool: True if streaming; otherwise, False. + """ return self._is_streaming @property def data(self) -> dict[str, float]: + """ + Get the latest sensor data. + + Returns: + dict[str, float]: A dictionary mapping channel names to their float values. + """ return self._data @property def roll(self) -> float: - """Returns estimated roll (rad).""" + """ + Get the estimated roll angle in radians. + + Returns: + float: Roll angle (rad). + """ return self._data["estRoll"] @property def pitch(self) -> float: - """Returns estimated pitch (rad).""" + """ + Get the estimated pitch angle in radians. + + Returns: + float: Pitch angle (rad). + """ return self._data["estPitch"] @property def yaw(self) -> float: - """Returns estimated yaw (rad).""" + """ + Get the estimated yaw angle in radians. + + Returns: + float: Yaw angle (rad). + """ return self._data["estYaw"] @property def vel_x(self) -> float: - """Returns estimated angular velocity about the x-axis (rad/s).""" + """ + Get the estimated angular velocity about the x-axis in rad/s. + + Returns: + float: Angular velocity (rad/s) about the x-axis. + """ return self._data["estAngularRateX"] @property def vel_y(self) -> float: - """Returns estimated angular velocity about the y-axis (rad/s).""" + """ + Get the estimated angular velocity about the y-axis in rad/s. + + Returns: + float: Angular velocity (rad/s) about the y-axis. + """ return self._data["estAngularRateY"] @property def vel_z(self) -> float: - """Returns estimated angular velocity about the z-axis (rad/s).""" + """ + Get the estimated angular velocity about the z-axis in rad/s. + + Returns: + float: Angular velocity (rad/s) about the z-axis. + """ return self._data["estAngularRateZ"] @property def acc_x(self) -> float: - """Returns estimated linear acceleration along the x-axis (m/s^2).""" + """ + Get the estimated linear acceleration along the x-axis in m/s². + + Returns: + float: Linear acceleration (m/s²) along the x-axis. + """ return self._data["estLinearAccelX"] @property def acc_y(self) -> float: - """Returns estimated linear acceleration along the y-axis (m/s^2).""" + """ + Get the estimated linear acceleration along the y-axis in m/s². + + Returns: + float: Linear acceleration (m/s²) along the y-axis. + """ return self._data["estLinearAccelY"] @property def acc_z(self) -> float: - """Returns estimated linear acceleration along the z-axis (m/s^2).""" + """ + Get the estimated linear acceleration along the z-axis in m/s². + + Returns: + float: Linear acceleration (m/s²) along the z-axis. + """ return self._data["estLinearAccelZ"] @property def gyro_x(self) -> float: + """ + Get the measured gyroscopic value for the x-axis. + + Note: + Gyro data is not available for the Lord Microstrain IMU, so this returns 0.0 + and logs a warning. + + Returns: + float: 0.0 + """ LOGGER.warning("Gyro data not available for Lord Microstrain IMU") return 0.0 @property def gyro_y(self) -> float: + """ + Get the measured gyroscopic value for the y-axis. + + Note: + Gyro data is not available for the Lord Microstrain IMU, so this returns 0.0 + and logs a warning. + + Returns: + float: 0.0 + """ LOGGER.warning("Gyro data not available for Lord Microstrain IMU") return 0.0 @property def gyro_z(self) -> float: + """ + Get the measured gyroscopic value for the z-axis. + + Note: + Gyro data is not available for the Lord Microstrain IMU, so this returns 0.0 + and logs a warning. + + Returns: + float: 0.0 + """ LOGGER.warning("Gyro data not available for Lord Microstrain IMU") return 0.0 @property def timestamp(self) -> float: - """Returns timestamp (s) of the data.""" + """ + Get the timestamp of the latest data packet in seconds. + + Returns: + float: Timestamp (s) from the sensor data. + """ return self._data["estFilterGpsTimeTow"] class BNO055(IMUBase): """ Sensor class for the Bosch BNO055 IMU. - This class wraps a more comprehensive Adafruit library for a simplified use consistent - with the OSL library framework. + + This class wraps the Adafruit BNO055 library to provide an interface + consistent with the OSL sensor framework. Connections: - * The sensor should be connected to the main i2c bus. - * It is also possible to connect it via uart, but this is not yet implemented. + - The sensor should be connected to the main I2C bus. + - UART connectivity is not implemented. Requirements: - * adafruit_bno055 - * board - * busio + - adafruit_bno055 + - board + - busio + + Author: + Kevin Best - Author: Kevin Best - Date: 8/22/2024 + Date: + 8/22/2024 """ def __init__( self, addr: int = 40, ) -> None: + """ + Initialize the BNO055 sensor. + + Args: + addr (int, optional): I2C address of the BNO055 sensor. Defaults to 40. + """ self._address: int = addr self._gyro_data: list[float] = [0.0, 0.0, 0.0] self._acc_data: list[float] = [0.0, 0.0, 0.0] self._is_streaming = False def __repr__(self) -> str: + """ + Return a string representation of the BNO055 sensor. + + Returns: + str: "BNO055_IMU" + """ return "BNO055_IMU" def start(self) -> None: + """ + Start the BNO055 sensor. + + Initializes the I2C bus, creates an instance of the Adafruit BNO055 sensor, + configures the sensor settings, and sets the streaming flag to True. + """ i2c = busio.I2C(board.SCL, board.SDA) try: self._adafruit_imu = adafruit_bno055.BNO055_I2C(i2c, address=self._address) except ValueError: print("BNO055 IMU Not Found on i2c bus! Check wiring!") - self.configure_IMU_settings() self._is_streaming = True def stop(self) -> None: + """ + Stop the BNO055 sensor. + + Sets the streaming flag to False. + """ self._is_streaming = False def update(self) -> None: + """ + Update the sensor data from the BNO055. + + Reads the latest acceleration and gyroscopic data from the sensor. + """ self._acc_data = self._adafruit_imu.acceleration self._gyro_data = self._adafruit_imu.gyro def configure_IMU_settings(self) -> None: """ - Configure the IMU mode and different range/bandwidth settings. - Hard coding settings for now. - Someone in the future could add wrapper support for these. + Configure the BNO055 sensor settings. + + Hard-coded configuration: + - Enables external crystal. + - Sets mode to ACCGYRO_MODE. + - Configures accelerometer range to ACCEL_2G and bandwidth to ACCEL_15_63HZ. + - Configures gyroscope range to GYRO_1000_DPS and bandwidth to GYRO_23HZ. """ self._adafruit_imu.use_external_crystal = True self._adafruit_imu.mode = adafruit_bno055.ACCGYRO_MODE @@ -272,38 +494,75 @@ def configure_IMU_settings(self) -> None: @property def acc_x(self) -> float: - """Returns measured acceleration along the x-axis (m/s^2).""" + """ + Get the measured acceleration along the x-axis in m/s². + + Returns: + float: Acceleration (m/s²) along the x-axis. + """ return self._acc_data[0] @property def acc_y(self) -> float: - """Returns measured acceleration along the y-axis (m/s^2).""" + """ + Get the measured acceleration along the y-axis in m/s². + + Returns: + float: Acceleration (m/s²) along the y-axis. + """ return self._acc_data[1] @property def acc_z(self) -> float: - """Returns measured acceleration along the z-axis (m/s^2).""" + """ + Get the measured acceleration along the z-axis in m/s². + + Returns: + float: Acceleration (m/s²) along the z-axis. + """ return self._acc_data[2] @property def gyro_x(self) -> float: - """Returns measured rotational velocity about the x-axis (rad/s).""" + """ + Get the measured rotational velocity about the x-axis in rad/s. + + Returns: + float: Angular velocity (rad/s) about the x-axis. + """ return self._gyro_data[0] @property def gyro_y(self) -> float: - """Returns measured rotational velocity about the y-axis (rad/s).""" + """ + Get the measured rotational velocity about the y-axis in rad/s. + + Returns: + float: Angular velocity (rad/s) about the y-axis. + """ return self._gyro_data[1] @property def gyro_z(self) -> float: - """Returns measured rotational velocity about the z-axis (rad/s).""" + """ + Get the measured rotational velocity about the z-axis in rad/s. + + Returns: + float: Angular velocity (rad/s) about the z-axis. + """ return self._gyro_data[2] @property def is_streaming(self) -> bool: + """ + Check if the BNO055 sensor is streaming data. + + Returns: + bool: True if streaming; otherwise, False. + """ return self._is_streaming if __name__ == "__main__": + # TODO: Add sample code depicting usage. pass diff --git a/opensourceleg/sensors/loadcell.py b/opensourceleg/sensors/loadcell.py index 2844484..74f3728 100644 --- a/opensourceleg/sensors/loadcell.py +++ b/opensourceleg/sensors/loadcell.py @@ -1,3 +1,22 @@ +""" +Module for interfacing with SRILoadcell sensors. + +This module provides an implementation of a load cell sensor (SRILoadcell) that +inherits from LoadcellBase. It uses an I2C interface via SMBus to communicate with +a strain amplifier and processes raw ADC data to compute forces and moments. + +Classes: + LoadcellNotRespondingException: Exception raised when the load cell does not respond. + MEMORY_CHANNELS: Enum representing memory channel addresses for load cell readings. + SRILoadcell: Concrete implementation of a load cell sensor that provides force and moment data. + +Dependencies: + - numpy + - smbus2 + - opensourceleg.logging + - opensourceleg.sensors.base +""" + import time from enum import Enum from typing import Any, Callable, Optional @@ -11,11 +30,30 @@ class LoadcellNotRespondingException(Exception): + """ + Exception raised when the load cell fails to respond. + + Attributes: + message (str): Description of the error. + """ + def __init__(self, message: str = "Load cell unresponsive.") -> None: + """ + Initialize the LoadcellNotRespondingException. + + Args: + message (str, optional): Error message. Defaults to "Load cell unresponsive.". + """ super().__init__(message) class MEMORY_CHANNELS(int, Enum): + """ + Enumeration of memory channel addresses used by the load cell. + + Each channel corresponds to a specific high or low byte of the ADC data. + """ + CH1_H = 8 CH1_L = 9 CH2_H = 10 @@ -31,6 +69,18 @@ class MEMORY_CHANNELS(int, Enum): class SRILoadcell(LoadcellBase): + """ + Implementation of a load cell sensor using the SRILoadcell hardware. + + This class communicates with a strain amplifier via I2C using the SMBus interface, + processes the raw ADC data, and computes forces (Fx, Fy, Fz) and moments (Mx, My, Mz) + based on a provided calibration matrix and hardware configuration. + + Class Attributes: + ADC_RANGE (int): The maximum ADC value (2**12 - 1). + OFFSET (float): The ADC mid-scale offset (half of 2**12). + """ + ADC_RANGE = 2**12 - 1 OFFSET = 2**12 / 2 @@ -43,9 +93,23 @@ def __init__( i2c_address: int = 0x66, ) -> None: """ - TODO: Write docstring for initial values + Initialize the SRILoadcell sensor. + + Validates the provided parameters and initializes internal variables for data + acquisition, calibration, and streaming. + + Args: + calibration_matrix (npt.NDArray[np.double]): A 6x6 calibration matrix. + amp_gain (float, optional): Amplifier gain; must be greater than 0. Defaults to 125.0. + exc (float, optional): Excitation voltage; must be greater than 0. Defaults to 5.0. + bus (int, optional): I2C bus number to use. Defaults to 1. + i2c_address (int, optional): I2C address of the strain amplifier. Defaults to 0x66. + + Raises: + TypeError: If calibration_matrix is not a 6x6 array. + ValueError: If amp_gain or exc are not greater than 0. """ - # Check that parameters are set correctly: + # Validate input parameters. if calibration_matrix.shape != (6, 6): LOGGER.info(f"[{self.__repr__()}] calibration_matrix must be a 6x6 array of np.double.") raise TypeError("calibration_matrix must be a 6x6 array of np.double.") @@ -74,11 +138,22 @@ def __init__( self._is_streaming: bool = False def start(self) -> None: + """ + Start the load cell sensor. + + Opens the I2C connection using SMBus, waits briefly for hardware stabilization, + and sets the streaming flag to True. + """ self._smbus = SMBus(self._bus) time.sleep(1) self._is_streaming = True def reset(self) -> None: + """ + Reset the load cell calibration. + + Resets the calibration offset to the zero value and marks the sensor as uncalibrated. + """ self._calibration_offset = self._zero_calibration_offset self._is_calibrated = False @@ -88,8 +163,17 @@ def update( data_callback: Optional[Callable[..., npt.NDArray[np.uint8]]] = None, ) -> None: """ - Queries the loadcell for the latest data. - Latest data can then be accessed via properties, e.g. loadcell.Fx. + Query the load cell for the latest data and update internal state. + + Reads raw ADC data (either via a provided callback or by reading from I2C), + converts it to engineering units using the calibration matrix, amplifier gain, + and excitation voltage, and subtracts any calibration offset. + + Args: + calibration_offset (Optional[npt.NDArray[np.double]], optional): + An offset to subtract from the processed data. If None, uses the current calibration offset. + data_callback (Optional[Callable[..., npt.NDArray[np.uint8]]], optional): + A callback function to provide raw data. If not provided, the sensor's internal method is used. """ data = data_callback() if data_callback else self._read_compressed_strain() @@ -99,6 +183,7 @@ def update( signed_data = ((data - self.OFFSET) / self.ADC_RANGE) * self._exc coupled_data = signed_data * 1000 / (self._exc * self._amp_gain) + # Process the data using the calibration matrix and subtract the offset. self._data = np.transpose(a=self._calibration_matrix.dot(b=np.transpose(a=coupled_data))) - calibration_offset def calibrate( @@ -108,10 +193,20 @@ def calibrate( data_callback: Optional[Callable[[], npt.NDArray[np.uint8]]] = None, ) -> None: """ - Obtains the initial loadcell reading (aka) loadcell_zero. - This is automatically subtracted off for subsequent calls of the update method. + Perform a zeroing (calibration) routine for the load cell. + + This method obtains an initial zero-load reading that is subtracted from subsequent + measurements. If the sensor has already been calibrated and 'reset' is False, a log message + is displayed. + + Args: + number_of_iterations (int, optional): Number of iterations to average for calibration. + Defaults to 2000. + reset (bool, optional): If True, forces recalibration by resetting the current calibration. + Defaults to False. + data_callback (Optional[Callable[[], npt.NDArray[np.uint8]]], optional): Optional callback + to provide raw data. Defaults to None. """ - if not self.is_calibrated: LOGGER.info( f"[{self.__repr__()}] Initiating zeroing routine, please ensure that there is no ground contact force." @@ -135,7 +230,6 @@ def calibrate( elif reset: self.reset() self.calibrate() - else: LOGGER.info( f"[{self.__repr__()}] Loadcell has already been zeroed. " @@ -143,12 +237,26 @@ def calibrate( ) def stop(self) -> None: + """ + Stop the load cell sensor. + + Sets the streaming flag to False and closes the I2C connection if it is open. + """ self._is_streaming = False if hasattr(self, "_smbus"): self._smbus.close() def _read_compressed_strain(self) -> Any: - """Used for more recent versions of strain amp firmware""" + """ + Read and unpack compressed strain data from the sensor. + + This method reads a block of data from the sensor via I2C and then unpacks it + using the compressed strain format. If multiple read attempts fail, a + LoadcellNotRespondingException is raised. + + Returns: + Any: The unpacked strain data. + """ try: data = self._smbus.read_i2c_block_data(self._i2c_address, MEMORY_CHANNELS.CH1_H, 10) self.failed_reads = 0 @@ -162,7 +270,17 @@ def _read_compressed_strain(self) -> Any: @staticmethod def _unpack_uncompressed_strain(data: npt.NDArray[np.uint8]) -> npt.NDArray[np.uint16]: - """Used for an older version of the strain amp firmware (at least pre-2017)""" + """ + Unpack raw ADC data using the uncompressed format. + + This method is used for older versions of the strain amplifier firmware (pre-2017). + + Args: + data (npt.NDArray[np.uint8]): Raw data read from the sensor. + + Returns: + npt.NDArray[np.uint16]: An array containing the unpacked values for 6 channels. + """ ch1 = (data[0] << 8) | data[1] ch2 = (data[2] << 8) | data[3] ch3 = (data[4] << 8) | data[5] @@ -173,7 +291,17 @@ def _unpack_uncompressed_strain(data: npt.NDArray[np.uint8]) -> npt.NDArray[np.u @staticmethod def _unpack_compressed_strain(data: npt.NDArray[np.uint8]) -> npt.NDArray[np.uint16]: - """Used for more recent versions of strainamp firmware""" + """ + Unpack raw ADC data using the compressed format. + + This method is used for more recent versions of the strain amplifier firmware. + + Args: + data (npt.NDArray[np.uint8]): Raw data read from the sensor. + + Returns: + npt.NDArray[np.uint16]: An array containing the unpacked values for 6 channels. + """ return np.array( object=[ (data[0] << 4) | ((data[1] >> 4) & 0x0F), @@ -187,68 +315,104 @@ def _unpack_compressed_strain(data: npt.NDArray[np.uint8]) -> npt.NDArray[np.uin @property def is_calibrated(self) -> bool: - """Indicates if load cell zeroing routine has been called.""" + """ + Indicates whether the load cell has been calibrated (zeroed). + + Returns: + bool: True if the calibration routine has been successfully completed; otherwise, False. + """ return self._is_calibrated @property def is_streaming(self) -> bool: + """ + Check if the load cell is currently streaming data. + + Returns: + bool: True if streaming; otherwise, False. + """ return self._is_streaming @property def fx(self) -> float: """ - Latest force in the x (medial/lateral) direction in Newtons. - If using the standard OSL setup, this is positive towards the user's right. + Get the latest force in the x (medial/lateral) direction in Newtons. + + For the standard OSL setup, this value is positive towards the user's right. + + Returns: + float: Force (N) along the x-axis. """ return float(self.data[0]) @property def fy(self) -> float: """ - Latest force in the y (anterior/posterior) direction in Newtons. - If using the standard OSL setup, this is positive in the posterior direction. + Get the latest force in the y (anterior/posterior) direction in Newtons. + + For the standard OSL setup, this value is positive in the posterior direction. + + Returns: + float: Force (N) along the y-axis. """ return float(self.data[1]) @property def fz(self) -> float: """ - Latest force in the z (vertical) direction in Newtons. - If using the standard OSL setup, this should be positive downwards. - i.e. quiet standing on the OSL should give a negative Fz. + Get the latest force in the z (vertical) direction in Newtons. + + For the standard OSL setup, this value is positive downwards. In quiet standing, + a negative Fz value is expected. + + Returns: + float: Force (N) along the z-axis. """ return float(self.data[2]) @property def mx(self) -> float: """ - Latest moment about the x (medial/lateral) axis in Nm. - If using the standard OSL setup, this axis is positive towards the user's right. + Get the latest moment about the x (medial/lateral) axis in Nm. + + For the standard OSL setup, this moment is positive towards the user's right. + + Returns: + float: Moment (Nm) about the x-axis. """ return float(self.data[3]) @property def my(self) -> float: """ - Latest moment about the y (anterior/posterior) axis in Nm. - If using the standard OSL setup, this axis is positive in the posterior direction. + Get the latest moment about the y (anterior/posterior) axis in Nm. + + For the standard OSL setup, this moment is positive in the posterior direction. + + Returns: + float: Moment (Nm) about the y-axis. """ return float(self.data[4]) @property def mz(self) -> float: """ - Latest moment about the z (vertical) axis in Nm. - If using the standard OSL setup, this axis is positive towards the ground. + Get the latest moment about the z (vertical) axis in Nm. + + For the standard OSL setup, this moment is positive towards the ground. + + Returns: + float: Moment (Nm) about the z-axis. """ return float(self.data[5]) @property def data(self) -> Any: """ - Returns a vector of the latest loadcell data. - [Fx, Fy, Fz, Mx, My, Mz] - Forces in N, moments in Nm. + Get the latest processed load cell data. + + Returns: + Any: A 1D vector containing [Fx, Fy, Fz, Mx, My, Mz], where forces are in Newtons and moments in Nm. """ if self._data is not None: return self._data[0] diff --git a/opensourceleg/time/time.py b/opensourceleg/time/time.py index f0606f2..9c8a6c7 100644 --- a/opensourceleg/time/time.py +++ b/opensourceleg/time/time.py @@ -38,10 +38,35 @@ def __repr__(self) -> str: return "LoopKiller" def handle_signal(self, signum: Any, frame: Any) -> None: + """ + Method to handle the signal from the operating system. + This method is called when the operating system sends a signal to the process. + The signal is typically a shutdown signal, such as SIGTERM, SIGINT, or SIGHUP. + + Args: + signum (Any): The signal number. + frame (Any): The frame object. + + Returns: + None + + Example: + >>> killer = LoopKiller() + >>> killer.handle_signal(signal.SIGTERM, None) + """ self.kill_now = True def get_fade(self) -> float: - # interpolates from 1 to zero with soft fade out + """ + Interpolates from 1 to zero with soft fade out. + + Returns: + float: The fade value. + + Example: + >>> killer = LoopKiller() + >>> killer.get_fade() + """ if self._kill_soon: t = time.monotonic() - self._soft_kill_time if t >= self._fade_time: @@ -54,6 +79,19 @@ def get_fade(self) -> float: @property def kill_now(self) -> bool: + """ + Property to get the kill_now value. + If the kill_now value is True, the loop will stop iterating. + If the kill_now value is False, the loop will continue iterating. + + Returns: + bool: The kill_now value. + + Example: + >>> killer = LoopKiller() + >>> killer.kill_now + """ + if self._kill_now: return True if self._kill_soon: @@ -64,6 +102,15 @@ def kill_now(self) -> bool: @kill_now.setter def kill_now(self, val: bool) -> None: + """ + Setter for the kill_now value. If true is set twice, then the loop will stop iterating immediately. + + Args: + val (bool): The value to set the kill_now value to. + + Returns: + None + """ if val: if self._kill_soon: # if you kill twice, then it becomes immediate self._kill_now = True @@ -127,9 +174,34 @@ def __del__(self) -> None: @property def fade(self) -> float: + """ + Property to get the fade value. + + Returns: + float: The fade value. + + Example: + >>> loop = SoftRealtimeLoop() + >>> loop.fade + """ return self.killer.get_fade() def run(self, function_in_loop: Callable, dt: Optional[float] = None) -> None: + """ + Method to run the function within the time loop. + + Args: + function_in_loop (Callable): The function to run within the time loop. + dt (Optional[float]): The time delta. Defaults to None. + + Returns: + None + + Example: + >>> loop = SoftRealtimeLoop() + >>> loop.run(function_in_loop) + TODO: Better example here. + """ if dt is None: dt = self.dt self.t0 = self.t1 = time.monotonic() + dt @@ -143,12 +215,44 @@ def run(self, function_in_loop: Callable, dt: Optional[float] = None) -> None: self.t1 += dt def stop(self) -> None: + """ + Method to stop the loop. + + Returns: + None + + Example: + >>> loop = SoftRealtimeLoop() + >>> loop.start() + ... Running loop ... + >>> loop.stop() + """ self.killer.kill_now = True def time(self) -> float: + """ + Method to get the current time since the start of the time loop. + + Returns: + float: The time since the start of the time loop. + + Example: + >>> loop = SoftRealtimeLoop() + >>> loop.time() + """ return time.monotonic() - self.t0 def time_since(self) -> float: + """ + Method to get the time since the last loop. TODO: Is this true? + + Returns: + float: The time since the last loop. + + Example: + >>> loop = SoftRealtimeLoop() + >>> loop.time_since() + """ return time.monotonic() - self.t1 def __iter__(self) -> "SoftRealtimeLoop": diff --git a/tests/test_logging/test_logging_logger.py b/tests/test_logging/test_logging_logger.py index 7316886..b70e34c 100644 --- a/tests/test_logging/test_logging_logger.py +++ b/tests/test_logging/test_logging_logger.py @@ -152,6 +152,7 @@ def test_repr(test_logger: Logger): # Test set file name def test_set_file_name_str(test_logger: Logger): test_logger.set_file_name("test_file") + print(test_logger._file_path) assert all([ test_logger._user_file_name == "test_file", test_logger._file_path == f"{CURR_DIR}/test_file.log", diff --git a/tests/test_time/test_time.py b/tests/test_time/test_time.py index e8816da..468e303 100644 --- a/tests/test_time/test_time.py +++ b/tests/test_time/test_time.py @@ -108,7 +108,7 @@ def test_softrealtimeloop_init(patch_time_time2): assert srtl.sum_var == 0.0 assert srtl.sleep_t_agg == 0.0 assert srtl.n == 0 - assert srtl.report is False + assert srtl.report is True @pytest.fixture diff --git a/tutorials/actuators/dephy/sensor_reading.py b/tutorials/actuators/dephy/sensor_reading.py index 137bc0f..15ef502 100644 --- a/tutorials/actuators/dephy/sensor_reading.py +++ b/tutorials/actuators/dephy/sensor_reading.py @@ -12,7 +12,7 @@ ) with actpack: - for t in clock: + for _t in clock: actpack.update() LOGGER.info( f"Motor Position: {actpack.motor_position}; " diff --git a/tutorials/actuators/dephy/test_dephy.py b/tutorials/actuators/dephy/test_dephy.py new file mode 100644 index 0000000..0d7af23 --- /dev/null +++ b/tutorials/actuators/dephy/test_dephy.py @@ -0,0 +1,185 @@ +import time + +import numpy as np + +import opensourceleg.actuators.dephy as Dephy +from opensourceleg.actuators.base import CONTROL_MODES +from opensourceleg.logging.logger import LOGGER +from opensourceleg.time import SoftRealtimeLoop + +TIME_TO_STEP = 1.0 +DT = 0.001 +clock = SoftRealtimeLoop(dt=DT) + + +def main(): + actpack = Dephy.DephyActuator( + port="/dev/ttyACM0", + gear_ratio=9.0, + ) + + control_types = { + "Current": test_current_control, + "Position": test_position_control, + "Voltage": test_voltage_control, + "Torque": test_voltage_control, + } + + with actpack: + try: + actpack.start() + except OSError: + LOGGER.exception(msg="OSError for actpack.start()") + + control_type = input("Please select a type of control:\n 1. Current\n 2. Position\n 3. Voltage\n 4. Torque\n") + + if control_type in control_types: + control_types[control_type](actpack) + else: + print("Please enter a valid control type. Valid control types are 'Current', 'Position', 'Voltage', 'Torque'") + + LOGGER.close() + actpack.stop() + exit() + + +def test_current_control(actpack: Dephy.DephyActuator): + t = clock.time() + with actpack: + try: + # Set control mode to current + actpack.set_control_mode(mode=CONTROL_MODES.CURRENT) + # Set current gains + actpack.set_current_gains() + # Set motor current + + actpack.update() + _prompt_to_continue(actpack, "current") + + current_t = clock.time() + while (t - current_t) < 1: + command_mcurrent = 2000 + actpack.set_motor_current(value=command_mcurrent) + + actpack.update() + _log_current_state(actpack, "Current", command_mcurrent) + + time.sleep(DT) + t = clock.time() + + command_mcurrent = 0 + actpack.set_motor_current(value=command_mcurrent) + + except KeyboardInterrupt: + LOGGER.exception("KeyboardInterrupt while setting motor current") + + +def test_position_control(actpack: Dephy.DephyActuator): + t = clock.time() + with actpack: + try: + # Set control mode to position + actpack.set_control_mode(mode=CONTROL_MODES.POSITION) + # Set position gains + actpack.set_position_gains() + # Set motor position + actpack.update() + motor_position = actpack.motor_position + _prompt_to_continue(actpack, "position") + current_t = clock.time() + while (t - current_t) < 5: + command_mposition = motor_position + np.pi + actpack.set_motor_position(value=command_mposition) + actpack.update() + _log_current_state(actpack, "Position", command_mposition) + time.sleep(DT) + t = clock.time() + + command_mposition = 0 + actpack.set_motor_position(value=command_mposition) + except KeyboardInterrupt: + LOGGER.exception("KeyboardInterrupt while setting motor position") + + +def test_voltage_control(actpack: Dephy.DephyActuator): + t = clock.time() + with actpack: + try: + # Set motor voltage + actpack.update() + _prompt_to_continue(actpack, "voltage") + current_t = clock.time() + while (t - current_t) < 2: + command_voltage = 2500 + actpack.set_motor_voltage(value=command_voltage) + actpack.update() + _log_current_state(actpack, "Voltage", command_voltage) + + time.sleep(DT) + t = clock.time() + + command_voltage = 0 + actpack.set_motor_voltage(value=command_voltage) + + except KeyboardInterrupt: + LOGGER.exception("KeyboardInterrupt while setting motor voltage") + + +def test_torque_control(actpack: Dephy.DephyActuator): + t = clock.time() + with actpack: + try: + # Set control mode to torque + actpack.set_control_mode(mode=CONTROL_MODES.TORQUE) + # Set motor current + actpack.update() + _prompt_to_continue(actpack, "torque") + + current_t = clock.time() + while (t - current_t) < 1: + command_torque = 0.005 + actpack.set_motor_torque(value=command_torque) + + actpack.update() + _log_current_state(actpack, "Torque", command_torque) + + time.sleep(DT) + t = clock.time() + + command_torque = 0 + actpack.set_motor_torque(value=command_torque) + + except KeyboardInterrupt: + LOGGER.exception("KeyboardInterrupt while setting motor torque") + + +def _prompt_to_continue(actpack: Dephy.DephyActuator, mode: str) -> None: + """Prompt to continue into next control mode""" + + LOGGER.info( + "".join( + f"Time:{clock.time()}\t" + + f"Motor Position: {actpack.motor_position}\t" + + f"Motor Voltage: {actpack.motor_voltage}\t" + + f"Motor Current: {actpack.motor_current}\t" + ) + ) + input(f"Press enter to continue into {mode} control mode") + return + + +def _log_current_state(actpack: Dephy.DephyActuator, mode: str, command: int): + """Helper function to log the current state""" + LOGGER.info( + "".join( + f"Time:{clock.time()}\t" + + f"Command {mode} : {command}\t" + + f"Motor Position: {actpack.motor_position}\t" + + f"Motor Voltage: {actpack.motor_voltage}\t" + + f"Motor Current: {actpack.motor_current}\t" + ) + ) + + +if __name__ == "__main__": + main() diff --git a/tutorials/actuators/dephy/voltage_control.py b/tutorials/actuators/dephy/voltage_control.py index 423284a..9854b30 100644 --- a/tutorials/actuators/dephy/voltage_control.py +++ b/tutorials/actuators/dephy/voltage_control.py @@ -15,7 +15,7 @@ with actpack: actpack.set_control_mode(mode=CONTROL_MODES.VOLTAGE) - + for t in clock: actpack.update()