diff --git a/armctl/universal_robots/protocols/config.xml b/armctl/universal_robots/protocols/config.xml index b4bf36f..ce429b7 100644 --- a/armctl/universal_robots/protocols/config.xml +++ b/armctl/universal_robots/protocols/config.xml @@ -3,6 +3,8 @@ + + diff --git a/armctl/universal_robots/protocols/rtde.py b/armctl/universal_robots/protocols/rtde.py index 54c03e4..ed734c0 100644 --- a/armctl/universal_robots/protocols/rtde.py +++ b/armctl/universal_robots/protocols/rtde.py @@ -3,8 +3,8 @@ from pathlib import Path from typing import NewType -import rtde.rtde as rtde -import rtde.rtde_config as rtde_config +from rtde import RTDE as _RTDE +from rtde.rtde_config import ConfigFile UINT32 = NewType("UINT32", int) @@ -12,13 +12,15 @@ class RTDE: def __init__(self, ip: str): config_file = Path(__file__).parent / "config.xml" - config = rtde_config.ConfigFile(str(config_file)) + config = ConfigFile(str(config_file)) state_names, state_types = config.get_recipe("out") - self.c = rtde.RTDE(ip) + self.c = _RTDE(ip) self.c.connect() self.c.send_output_setup(state_names, state_types) - self.c.get_controller_version() + self.controller_version = ( + self.c.get_controller_version() + ) # (MAJOR, MINOR, BUGFIX, BUILD) self.c.send_start() def _get_data(self): @@ -34,6 +36,20 @@ def tcp_pose(self) -> list[float]: """Return TCP pose [x, y, z, rx, ry, rz].""" return list(self._get_data().actual_TCP_pose) + def joint_torques(self) -> list[float]: + """Return joint torques in Nm converted from current.""" + # See: https://www.universal-robots.com/articles/ur/release-notes/release-note-software-version-523x/ + if self.controller_version >= (5, 23, 0, 0): + return list(self._get_data().actual_current_as_torque) + else: + raise NotImplementedError( + "Joint torques not available for controller versions below 5.23.0.0" + ) + + def tcp_force(self) -> list[float]: + """Return TCP force [Fx, Fy, Fz, Tx, Ty, Tz] in Newton and Newton-meters.""" + return list(self._get_data().actual_TCP_force) + def robot_status(self) -> dict[str, bool]: """Return robot status. diff --git a/armctl/universal_robots/universal_robots.py b/armctl/universal_robots/universal_robots.py index 8cecc28..9a0c1df 100644 --- a/armctl/universal_robots/universal_robots.py +++ b/armctl/universal_robots/universal_robots.py @@ -6,6 +6,7 @@ from armctl.templates import SocketController as SCT from armctl.templates.logger import logger from armctl.utils import CommandCheck as cc +from .protocols.rtde import RTDE ### Notes ### # Command Format: CMD(args)\n @@ -13,57 +14,7 @@ class UniversalRobots(SCT, Commands, Properties): - def _check_rtde(self): - try: - from .protocols.rtde import RTDE - except ImportError: - import os - import shutil - import sys - from subprocess import run - - logger.warning( - "RTDE Python Client Library not found. Attempting installation..." - ) - - # Determine installer - if shutil.which("uv") and os.environ.get("VIRTUAL_ENV"): - install_cmd = [ - "uv", - "add", - "--quiet", - "urrtde@git+https://github.com/UniversalRobots/RTDE_Python_Client_Library.git@main", - ] - elif shutil.which("poetry") and os.environ.get("POETRY_ACTIVE"): - install_cmd = [ - "poetry", - "add", - "git+https://github.com/UniversalRobots/RTDE_Python_Client_Library.git@main", - ] - else: - install_cmd = [ - sys.executable, - "-m", - "pip", - "install", - "--quiet", - "git+https://github.com/UniversalRobots/RTDE_Python_Client_Library.git@main", - ] - - try: - run(install_cmd, check=True) - except Exception as e: - logger.error( - f"Failed to install RTDE Python Client Library: {e}" - ) - raise ImportError( - "Could not install RTDE Python Client Library." - ) from e - - # Try import again - def __init__(self, ip: str, port: int | tuple[int, int] = 30_002): - self._check_rtde() super().__init__(ip, port) self.JOINT_RANGES = [ (-2 * math.pi, 2 * math.pi), @@ -80,7 +31,6 @@ def __init__(self, ip: str, port: int | tuple[int, int] = 30_002): def connect(self): super().connect() - from .protocols.rtde import RTDE self.rtde = RTDE(self.ip) # Initialize RTDE connection @@ -203,6 +153,32 @@ def get_cartesian_position(self) -> list[float]: logger.receive(f"Received response: {pose}") return pose + def get_joint_torques(self) -> list[float]: + """ + Get the current joint torques of the robot. + + Returns + ------- + list of float + Joint torques in Nm [T1, T2, T3, T4, T5, T6]. + """ + torques = self.rtde.joint_torques() + logger.receive(f"Received response: {torques}") + return torques + + def get_tcp_forces(self) -> list[float]: + """ + Get the current TCP forces of the robot. + + Returns + ------- + list of float + TCP forces [Fx, Fy, Fz, Tx, Ty, Tz] in Newton and Newton-meters. + """ + forces = self.rtde.tcp_force() + logger.receive(f"Received response: {forces}") + return forces + def stop_motion(self) -> None: deceleration = 2.0 # rad/s^2 self.send_command(f"stopj({deceleration})\n", suppress_output=True)