From 671eeb7f4759963bcb5d4092a06fc6219577ef33 Mon Sep 17 00:00:00 2001 From: ChipsICU Date: Thu, 3 Jul 2025 22:19:43 +0800 Subject: [PATCH] add deploy code on g1 --- deploy_real/ReadMe.md | 7 + deploy_real/common/command_helper.py | 60 +++++ deploy_real/common/motion_lib_helper.py | 19 ++ deploy_real/common/remote_controller.py | 39 +++ deploy_real/common/rotation_helper.py | 25 ++ deploy_real/config.py | 45 ++++ deploy_real/configs/g1_23.yaml | 58 +++++ deploy_real/deploy_real.py | 333 ++++++++++++++++++++++++ 8 files changed, 586 insertions(+) create mode 100644 deploy_real/ReadMe.md create mode 100644 deploy_real/common/command_helper.py create mode 100644 deploy_real/common/motion_lib_helper.py create mode 100644 deploy_real/common/remote_controller.py create mode 100644 deploy_real/common/rotation_helper.py create mode 100644 deploy_real/config.py create mode 100644 deploy_real/configs/g1_23.yaml create mode 100644 deploy_real/deploy_real.py diff --git a/deploy_real/ReadMe.md b/deploy_real/ReadMe.md new file mode 100644 index 0000000..911ef8e --- /dev/null +++ b/deploy_real/ReadMe.md @@ -0,0 +1,7 @@ +# Deploy on G1 + +## Usage +Here is a usage example, for more information, check [unitree_rl_gym](https://github.com/unitreerobotics/unitree_rl_gym) +```bash +python deploy_real/deploy_real.py enp3s0 g1_23.yaml +``` \ No newline at end of file diff --git a/deploy_real/common/command_helper.py b/deploy_real/common/command_helper.py new file mode 100644 index 0000000..ed1b296 --- /dev/null +++ b/deploy_real/common/command_helper.py @@ -0,0 +1,60 @@ +from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_ as LowCmdGo +from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ as LowCmdHG +from typing import Union + +class MotorMode: + PR = 0 # Series Control for Pitch/Roll Joints + AB = 1 # Parallel Control for A/B Joints + + +def create_damping_cmd(cmd: Union[LowCmdGo, LowCmdHG]): + size = len(cmd.motor_cmd) + for i in range(size): + cmd.motor_cmd[i].q = 0 + cmd.motor_cmd[i].qd = 0 + cmd.motor_cmd[i].kp = 0 + cmd.motor_cmd[i].kd = 8 + cmd.motor_cmd[i].tau = 0 + + +def create_zero_cmd(cmd: Union[LowCmdGo, LowCmdHG]): + size = len(cmd.motor_cmd) + for i in range(size): + cmd.motor_cmd[i].q = 0 + cmd.motor_cmd[i].qd = 0 + cmd.motor_cmd[i].kp = 0 + cmd.motor_cmd[i].kd = 0 + cmd.motor_cmd[i].tau = 0 + + +def init_cmd_hg(cmd: LowCmdHG, mode_machine: int, mode_pr: int): + cmd.mode_machine = mode_machine + cmd.mode_pr = mode_pr + size = len(cmd.motor_cmd) + for i in range(size): + cmd.motor_cmd[i].mode = 1 + cmd.motor_cmd[i].q = 0 + cmd.motor_cmd[i].qd = 0 + cmd.motor_cmd[i].kp = 0 + cmd.motor_cmd[i].kd = 0 + cmd.motor_cmd[i].tau = 0 + + +def init_cmd_go(cmd: LowCmdGo, weak_motor: list): + cmd.head[0] = 0xFE + cmd.head[1] = 0xEF + cmd.level_flag = 0xFF + cmd.gpio = 0 + PosStopF = 2.146e9 + VelStopF = 16000.0 + size = len(cmd.motor_cmd) + for i in range(size): + if i in weak_motor: + cmd.motor_cmd[i].mode = 1 + else: + cmd.motor_cmd[i].mode = 0x0A + cmd.motor_cmd[i].q = PosStopF + cmd.motor_cmd[i].qd = VelStopF + cmd.motor_cmd[i].kp = 0 + cmd.motor_cmd[i].kd = 0 + cmd.motor_cmd[i].tau = 0 diff --git a/deploy_real/common/motion_lib_helper.py b/deploy_real/common/motion_lib_helper.py new file mode 100644 index 0000000..9c356bb --- /dev/null +++ b/deploy_real/common/motion_lib_helper.py @@ -0,0 +1,19 @@ +import joblib + +def get_motion_len(motion_file_path: str): + """ + Get the length of the motion from a motion file. + + Args: + motion_file_path (str): Path to the motion file. + + Returns: + int: Length of the motion. + """ + motion_data = joblib.load(motion_file_path) + motion_data = motion_data[list(motion_data.keys())[0]] + fps = motion_data["fps"] + dt = 1.0 / fps + num_frames = motion_data["root_rot"].shape[0] + motion_len = dt * (num_frames - 1) + return motion_len \ No newline at end of file diff --git a/deploy_real/common/remote_controller.py b/deploy_real/common/remote_controller.py new file mode 100644 index 0000000..ff50308 --- /dev/null +++ b/deploy_real/common/remote_controller.py @@ -0,0 +1,39 @@ +import struct + + +class KeyMap: + R1 = 0 + L1 = 1 + start = 2 + select = 3 + R2 = 4 + L2 = 5 + F1 = 6 + F2 = 7 + A = 8 + B = 9 + X = 10 + Y = 11 + up = 12 + right = 13 + down = 14 + left = 15 + + +class RemoteController: + def __init__(self): + self.lx = 0 + self.ly = 0 + self.rx = 0 + self.ry = 0 + self.button = [0] * 16 + + def set(self, data): + # wireless_remote + keys = struct.unpack("H", data[2:4])[0] + for i in range(16): + self.button[i] = (keys & (1 << i)) >> i + self.lx = struct.unpack("f", data[4:8])[0] + self.rx = struct.unpack("f", data[8:12])[0] + self.ry = struct.unpack("f", data[12:16])[0] + self.ly = struct.unpack("f", data[20:24])[0] diff --git a/deploy_real/common/rotation_helper.py b/deploy_real/common/rotation_helper.py new file mode 100644 index 0000000..bcf131c --- /dev/null +++ b/deploy_real/common/rotation_helper.py @@ -0,0 +1,25 @@ +import numpy as np +from scipy.spatial.transform import Rotation as R + + +def get_gravity_orientation(quaternion): + qw = quaternion[0] + qx = quaternion[1] + qy = quaternion[2] + qz = quaternion[3] + + gravity_orientation = np.zeros(3) + + gravity_orientation[0] = 2 * (-qz * qx + qw * qy) + gravity_orientation[1] = -2 * (qz * qy + qw * qx) + gravity_orientation[2] = 1 - 2 * (qw * qw + qz * qz) + + return gravity_orientation + + +def transform_imu_data(waist_yaw, waist_yaw_omega, imu_quat, imu_omega): + RzWaist = R.from_euler("z", waist_yaw).as_matrix() + R_torso = R.from_quat([imu_quat[1], imu_quat[2], imu_quat[3], imu_quat[0]]).as_matrix() + R_pelvis = np.dot(R_torso, RzWaist.T) + w = np.dot(RzWaist, imu_omega[0]) - np.array([0, 0, waist_yaw_omega]) + return R.from_matrix(R_pelvis).as_quat()[[3, 0, 1, 2]], w diff --git a/deploy_real/config.py b/deploy_real/config.py new file mode 100644 index 0000000..40b66b0 --- /dev/null +++ b/deploy_real/config.py @@ -0,0 +1,45 @@ +import numpy as np +import yaml + +class Config: + def __init__(self, file_path) -> None: + with open(file_path, "r") as f: + config = yaml.load(f, Loader=yaml.FullLoader) + + self.control_dt = config["control_dt"] + + self.msg_type = config["msg_type"] + self.imu_type = config["imu_type"] + + self.weak_motor = [] + if "weak_motor" in config: + self.weak_motor = config["weak_motor"] + + self.lowcmd_topic = config["lowcmd_topic"] + self.lowstate_topic = config["lowstate_topic"] + + self.policy_path = config["policy_path"] + + self.leg_joint2motor_idx = config["leg_joint2motor_idx"] + self.kps = config["kps"] + self.kds = config["kds"] + self.default_angles = np.array(config["default_angles"], dtype=np.float32) + + self.arm_waist_joint2motor_idx = config["arm_waist_joint2motor_idx"] + self.arm_waist_kps = config["arm_waist_kps"] + self.arm_waist_kds = config["arm_waist_kds"] + self.arm_waist_target = np.array(config["arm_waist_target"], dtype=np.float32) + + self.ang_vel_scale = config["ang_vel_scale"] + self.dof_pos_scale = config["dof_pos_scale"] + self.dof_vel_scale = config["dof_vel_scale"] + self.action_scale = config["action_scale"] + self.cmd_scale = np.array(config["cmd_scale"], dtype=np.float32) + self.max_cmd = np.array(config["max_cmd"], dtype=np.float32) + + self.num_actions = config["num_actions"] + self.num_obs = config["num_obs"] + + self.frame_stack = config.get("frame_stack", 5) + self.motion_file = config["motion_file"] + self.clip_action_limit = config["clip_action_limit"] \ No newline at end of file diff --git a/deploy_real/configs/g1_23.yaml b/deploy_real/configs/g1_23.yaml new file mode 100644 index 0000000..8133abb --- /dev/null +++ b/deploy_real/configs/g1_23.yaml @@ -0,0 +1,58 @@ +control_dt: 0.02 + +msg_type: "hg" # "hg" or "go" +imu_type: "pelvis" # "torso" or "pelvis" + +lowcmd_topic: "rt/lowcmd" +lowstate_topic: "rt/lowstate" + +policy_path: "example/pretrained_horse_stance_punch/exported/model_33000.onnx" +motion_file: "example/motion_data/Horse-stance_punch.pkl" + +leg_joint2motor_idx: [0, 1, 2, 3, 4, 5, + 6, 7, 8, 9, 10, 11, + 12, 13, 14, + 15, 16, 17, 18, + 22, 23, 24, 25, + ] +kps: [100, 100, 100, 150, 40, 40, + 100, 100, 100, 150, 40, 40, + 400, 400, 400, + 100, 100, 50, 50, + 100, 100, 50, 50, + ] +kds: [2, 2, 2, 4, 2, 2, + 2, 2, 2, 4, 2, 2, + 5, 5, 5, + 2, 2, 2, 2, + 2, 2, 2, 2, + ] +default_angles: [-0.1, 0.0, 0.0, 0.3, -0.2, 0.0, + -0.1, 0.0, 0.0, 0.3, -0.2, 0.0, + 0.0, 0.0, 0.0, + 0.2, 0.2, 0.0, 0.9, + 0.2, -0.2, 0.0, 0.9, + ] + +arm_waist_joint2motor_idx: [19, 20, 21, + 26, 27, 28] + +arm_waist_kps: [20, 20, 20, + 20, 20, 20,] + +arm_waist_kds: [1, 1, 1, + 1, 1, 1] + +arm_waist_target: [ 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0,] +ang_vel_scale: 0.25 +dof_pos_scale: 1.0 +dof_vel_scale: 0.05 +action_scale: 0.25 +cmd_scale: [2.0, 2.0, 0.25] +num_actions: 23 +num_obs: 76 +frame_stack: 5 +clip_action_limit: 100.0 + +max_cmd: [1., 0., 1] \ No newline at end of file diff --git a/deploy_real/deploy_real.py b/deploy_real/deploy_real.py new file mode 100644 index 0000000..4205970 --- /dev/null +++ b/deploy_real/deploy_real.py @@ -0,0 +1,333 @@ +from typing import Union +import numpy as np +import time +import torch + +from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize +from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize +from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_, unitree_hg_msg_dds__LowState_ +from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_, unitree_go_msg_dds__LowState_ +from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ as LowCmdHG +from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_ as LowCmdGo +from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowState_ as LowStateHG +from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_ as LowStateGo +from unitree_sdk2py.utils.crc import CRC + +from common.command_helper import ( + create_damping_cmd, + create_zero_cmd, + init_cmd_hg, + init_cmd_go, + MotorMode + ) +from common.rotation_helper import get_gravity_orientation, transform_imu_data +from common.remote_controller import RemoteController, KeyMap +from common.motion_lib_helper import get_motion_len +from config import Config +from collections import deque +import onnxruntime as ort + + +class Controller: + def __init__(self, config: Config) -> None: + self.config = config + self.frame_stack = self.config.frame_stack + + self.motion_file = self.config.motion_file + self.motion_len = get_motion_len(self.motion_file) + + self.remote_controller = RemoteController() + + # Initialize the policy network + # self.policy = torch.jit.load(config.policy_path) + self.policy = ort.InferenceSession(config.policy_path) + # Initializing process variables + self.qj = np.zeros(config.num_actions, dtype=np.float32) + self.dqj = np.zeros(config.num_actions, dtype=np.float32) + self.action = np.zeros(config.num_actions, dtype=np.float32) + self.target_dof_pos = config.default_angles.copy() + self.obs = np.zeros(config.num_obs, dtype=np.float32) + self.counter = 0 + + if config.msg_type == "hg": + # g1 and h1_2 use the hg msg type + self.low_cmd = unitree_hg_msg_dds__LowCmd_() + self.low_state = unitree_hg_msg_dds__LowState_() + self.mode_pr_ = MotorMode.PR + self.mode_machine_ = 0 + + self.lowcmd_publisher_ = ChannelPublisher(config.lowcmd_topic, LowCmdHG) + self.lowcmd_publisher_.Init() + + self.lowstate_subscriber = ChannelSubscriber(config.lowstate_topic, LowStateHG) + self.lowstate_subscriber.Init(self.LowStateHgHandler, 10) + + elif config.msg_type == "go": + # h1 uses the go msg type + self.low_cmd = unitree_go_msg_dds__LowCmd_() + self.low_state = unitree_go_msg_dds__LowState_() + + self.lowcmd_publisher_ = ChannelPublisher(config.lowcmd_topic, LowCmdGo) + self.lowcmd_publisher_.Init() + + self.lowstate_subscriber = ChannelSubscriber(config.lowstate_topic, LowStateGo) + self.lowstate_subscriber.Init(self.LowStateGoHandler, 10) + + else: + raise ValueError("Invalid msg_type") + + # wait for the subscriber to receive data + self.wait_for_low_state() + + # Initialize the command msg + if config.msg_type == "hg": + init_cmd_hg(self.low_cmd, self.mode_machine_, self.mode_pr_) + elif config.msg_type == "go": + init_cmd_go(self.low_cmd, weak_motor=self.config.weak_motor) + + self.start_time = time.time() + + # Initialize histories for each observation type + self.history = { + "action": deque(maxlen=self.frame_stack-1), + "omega": deque(maxlen=self.frame_stack-1), + "qj": deque(maxlen=self.frame_stack-1), + "dqj": deque(maxlen=self.frame_stack-1), + "gravity_orientation": deque(maxlen=self.frame_stack-1), + "ref_motion_phase": deque(maxlen=self.frame_stack-1), + } + + for _ in range(self.frame_stack - 1): + for key in self.history: + if key in ["action", "qj", "dqj"]: + self.history[key].append(torch.zeros(1, self.config.num_actions, dtype=torch.float)) + elif key in ["omega", "gravity_orientation"]: + self.history[key].append(torch.zeros(1, 3, dtype=torch.float)) + elif key == "ref_motion_phase": + self.history[key].append(torch.zeros(1, 1, dtype=torch.float)) + else: + raise ValueError(f"Not Implement: {key}") + + def LowStateHgHandler(self, msg: LowStateHG): + self.low_state = msg + self.mode_machine_ = self.low_state.mode_machine + self.remote_controller.set(self.low_state.wireless_remote) + + def LowStateGoHandler(self, msg: LowStateGo): + self.low_state = msg + self.remote_controller.set(self.low_state.wireless_remote) + + def send_cmd(self, cmd: Union[LowCmdGo, LowCmdHG]): + cmd.crc = CRC().Crc(cmd) + self.lowcmd_publisher_.Write(cmd) + + def wait_for_low_state(self): + while self.low_state.tick == 0: + time.sleep(self.config.control_dt) + print("Successfully connected to the robot.") + + def zero_torque_state(self): + print("Enter zero torque state.") + print("Waiting for the start signal...") + while self.remote_controller.button[KeyMap.start] != 1: + create_zero_cmd(self.low_cmd) + self.send_cmd(self.low_cmd) + time.sleep(self.config.control_dt) + + def move_to_default_pos(self): + print("Moving to default pos.") + # move time 2s + total_time = 5 + num_step = int(total_time / self.config.control_dt) + + dof_idx = self.config.leg_joint2motor_idx + self.config.arm_waist_joint2motor_idx + kps = self.config.kps + self.config.arm_waist_kps + kds = self.config.kds + self.config.arm_waist_kds + self.default_pos = np.concatenate((self.config.default_angles, self.config.arm_waist_target), axis=0) + self.default_angles = self.default_pos[0:23] + dof_size = len(dof_idx) + + # record the current pos + init_dof_pos = np.zeros(dof_size, dtype=np.float32) + for i in range(dof_size): + init_dof_pos[i] = self.low_state.motor_state[dof_idx[i]].q + + # move to default pos + for i in range(num_step): + alpha = i / num_step + for j in range(dof_size): + motor_idx = dof_idx[j] + target_pos = self.default_pos[j] + self.low_cmd.motor_cmd[motor_idx].q = init_dof_pos[j] * (1 - alpha) + target_pos * alpha + self.low_cmd.motor_cmd[motor_idx].qd = 0 + self.low_cmd.motor_cmd[motor_idx].kp = kps[j] + self.low_cmd.motor_cmd[motor_idx].kd = kds[j] + self.low_cmd.motor_cmd[motor_idx].tau = 0 + self.send_cmd(self.low_cmd) + time.sleep(self.config.control_dt) + + def default_pos_state(self): + print("Enter default pos state.") + print("Waiting for the Button A signal...") + while self.remote_controller.button[KeyMap.A] != 1: + for i in range(len(self.config.leg_joint2motor_idx)): + motor_idx = self.config.leg_joint2motor_idx[i] + self.low_cmd.motor_cmd[motor_idx].q = self.default_angles[i] + self.low_cmd.motor_cmd[motor_idx].qd = 0 + self.low_cmd.motor_cmd[motor_idx].kp = self.config.kps[i] + self.low_cmd.motor_cmd[motor_idx].kd = self.config.kds[i] + self.low_cmd.motor_cmd[motor_idx].tau = 0 + for i in range(len(self.config.arm_waist_joint2motor_idx)): + motor_idx = self.config.arm_waist_joint2motor_idx[i] + self.low_cmd.motor_cmd[motor_idx].q = self.config.arm_waist_target[i] + self.low_cmd.motor_cmd[motor_idx].qd = 0 + self.low_cmd.motor_cmd[motor_idx].kp = self.config.arm_waist_kps[i] + self.low_cmd.motor_cmd[motor_idx].kd = self.config.arm_waist_kds[i] + self.low_cmd.motor_cmd[motor_idx].tau = 0 + self.send_cmd(self.low_cmd) + time.sleep(self.config.control_dt) + + + def run(self): + + self.counter += 1 + # Get the current joint position and velocity + for i in range(len(self.config.leg_joint2motor_idx)): + self.qj[i] = self.low_state.motor_state[self.config.leg_joint2motor_idx[i]].q + self.dqj[i] = self.low_state.motor_state[self.config.leg_joint2motor_idx[i]].dq + + # imu_state quaternion: w, x, y, z + quat = self.low_state.imu_state.quaternion + ang_vel = np.array([self.low_state.imu_state.gyroscope], dtype=np.float32) + + if self.config.imu_type == "torso": + # h1 and h1_2 imu is on the torso + # imu data needs to be transformed to the pelvis frame + waist_yaw = self.low_state.motor_state[self.config.arm_waist_joint2motor_idx[0]].q + waist_yaw_omega = self.low_state.motor_state[self.config.arm_waist_joint2motor_idx[0]].dq + quat, ang_vel = transform_imu_data(waist_yaw=waist_yaw, waist_yaw_omega=waist_yaw_omega, imu_quat=quat, imu_omega=ang_vel) + + # create observation + + # 1. Gather individual history tensors + action_hist_tensor = torch.cat([self.history["action"][i] for i in range(self.frame_stack-1)], dim=1) + omega_hist_tensor = torch.cat([self.history["omega"][i] for i in range(self.frame_stack-1)], dim=1) + qj_hist_tensor = torch.cat([self.history["qj"][i] for i in range(self.frame_stack-1)], dim=1) + dqj_hist_tensor = torch.cat([self.history["dqj"][i] for i in range(self.frame_stack-1)], dim=1) + gravity_orientation_hist_tensor = torch.cat([self.history["gravity_orientation"][i] for i in range(self.frame_stack-1)], dim=1) + ref_motion_phase_hist_tensor = torch.cat([self.history["ref_motion_phase"][i] for i in range(self.frame_stack-1)], dim=1) + + # 2. Concatenate all parts into a single observation tensor + obs_hist = torch.cat([ + action_hist_tensor, + omega_hist_tensor, + qj_hist_tensor, + dqj_hist_tensor, + gravity_orientation_hist_tensor, + ref_motion_phase_hist_tensor + ], dim=1) + + # 3. Get the current observation + gravity_orientation = get_gravity_orientation(quat) + qj_obs = self.qj.copy() + dqj_obs = self.dqj.copy() + qj_obs = (qj_obs - self.default_angles) * self.config.dof_pos_scale + dqj_obs = dqj_obs * self.config.dof_vel_scale + ang_vel = ang_vel * self.config.ang_vel_scale + ref_motion_phase = ((self.counter * self.config.control_dt) % self.motion_len) / self.motion_len + + num_actions = self.config.num_actions + + curr_obs = np.zeros(self.config.num_obs, dtype=np.float32) + curr_obs[: num_actions] = self.action + curr_obs[num_actions: num_actions + 3] = ang_vel + curr_obs[num_actions + 3: 2 * num_actions + 3] = qj_obs + curr_obs[2 * num_actions + 3: 3 * num_actions + 3] = dqj_obs + curr_obs[3 * num_actions + 3: 3 * num_actions + 6] = gravity_orientation + curr_obs[6 + 3 * num_actions] = ref_motion_phase + + curr_obs_tensor = torch.from_numpy(curr_obs).unsqueeze(0) + + # 4. Get obs buffer, the order is key's alphabetical order + self.obs_buf = torch.cat([ + curr_obs_tensor[:, :3 * num_actions + 3], + obs_hist, + curr_obs_tensor[:, 3 * num_actions + 3:]], + dim=1 + ) + + # 5. Update the history + self.history["action"].appendleft(curr_obs_tensor[:, :num_actions]) + self.history["omega"].appendleft(curr_obs_tensor[:, num_actions:num_actions+3]) + self.history["qj"].appendleft(curr_obs_tensor[:, num_actions+3:num_actions+3+num_actions]) + self.history["dqj"].appendleft(curr_obs_tensor[:, num_actions+3+num_actions:num_actions+3+2*num_actions]) + self.history["gravity_orientation"].appendleft(curr_obs_tensor[:, num_actions+3+2*num_actions:num_actions+3+2*num_actions+3]) + self.history["ref_motion_phase"].appendleft(curr_obs_tensor[:, -1].unsqueeze(0)) + + # 6. Get policy's inference + input_name = self.policy.get_inputs()[0].name + outputs = self.policy.run(None, {input_name: self.obs_buf.numpy()}) + self.action = outputs[0].squeeze() + target_dof_pos = self.default_angles + self.action * self.config.action_scale + + # Build low cmd + for i in range(len(self.config.leg_joint2motor_idx)): + motor_idx = self.config.leg_joint2motor_idx[i] + self.low_cmd.motor_cmd[motor_idx].q = target_dof_pos[i] + self.low_cmd.motor_cmd[motor_idx].qd = 0 + self.low_cmd.motor_cmd[motor_idx].kp = self.config.kps[i] + self.low_cmd.motor_cmd[motor_idx].kd = self.config.kds[i] + self.low_cmd.motor_cmd[motor_idx].tau = 0 + + for i in range(len(self.config.arm_waist_joint2motor_idx)): + motor_idx = self.config.arm_waist_joint2motor_idx[i] + self.low_cmd.motor_cmd[motor_idx].q = self.config.arm_waist_target[i] + self.low_cmd.motor_cmd[motor_idx].qd = 0 + self.low_cmd.motor_cmd[motor_idx].kp = self.config.arm_waist_kps[i] + self.low_cmd.motor_cmd[motor_idx].kd = self.config.arm_waist_kds[i] + self.low_cmd.motor_cmd[motor_idx].tau = 0 + + # send the command + self.send_cmd(self.low_cmd) + + time.sleep(self.config.control_dt) + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("net", type=str, help="network interface") + parser.add_argument("config", type=str, help="config file name in the configs folder", default="g1.yaml") + args = parser.parse_args() + + # Load config + config_path = f"deploy_real/configs/{args.config}" + config = Config(config_path) + + # Initialize DDS communication + ChannelFactoryInitialize(0, args.net) + + controller = Controller(config) + + # Enter the zero torque state, press the start key to continue executing + controller.zero_torque_state() + + # Move to the default position + controller.move_to_default_pos() + + # Enter the default position state, press the A key to continue executing + controller.default_pos_state() + + while True: + try: + controller.run() + # Press the select key to exit + if controller.remote_controller.button[KeyMap.select] == 1: + break + except KeyboardInterrupt: + break + # Enter the damping state + create_damping_cmd(controller.low_cmd) + controller.send_cmd(controller.low_cmd) + print("Exit")