diff --git a/examples/robosuite/custom_mjcf_table.scenic b/examples/robosuite/custom_mjcf_table.scenic index 28e3188c9..2717b8f54 100644 --- a/examples/robosuite/custom_mjcf_table.scenic +++ b/examples/robosuite/custom_mjcf_table.scenic @@ -1,3 +1,4 @@ +# examples/robosuite/custom_mjcf_table.scenic """Test with two XML tables and XML cubes on top, positioned in front of robot.""" model scenic.simulators.robosuite.model @@ -68,19 +69,19 @@ green_cube_xml = """ # Create the first table (brown) - closer to robot table1 = new CustomObject on arena_floor, - with mjcf_xml table_xml + with mjcfXml table_xml # Create the second table (blue) - farther from robot table2 = new CustomObject on arena_floor, - with mjcf_xml table_xml + with mjcfXml table_xml red_cube = new CustomObject on table1, - with mjcf_xml red_cube_xml + with mjcfXml red_cube_xml # Place green cube on second table green_cube = new CustomObject on table2, - with mjcf_xml green_cube_xml + with mjcfXml green_cube_xml # Robot at origin ego = new Panda at (0, 0, 0) \ No newline at end of file diff --git a/examples/robosuite/file_xml.scenic b/examples/robosuite/file_xml.scenic index 0cb6a2c90..1a7897c12 100644 --- a/examples/robosuite/file_xml.scenic +++ b/examples/robosuite/file_xml.scenic @@ -12,5 +12,5 @@ table1 = new Table on arena_floor # Load bread object from XML file bread = new CustomObject on table1, - with mjcf_xml "custom_object/bread.xml", + with mjcfXml "custom_object/bread.xml", with color (0, 1, 1, 1) \ No newline at end of file diff --git a/examples/robosuite/four_table_workspace.scenic b/examples/robosuite/four_table_workspace.scenic deleted file mode 100644 index 871a581b5..000000000 --- a/examples/robosuite/four_table_workspace.scenic +++ /dev/null @@ -1,43 +0,0 @@ -''' -This example is not working, fix: WIP. - -''' - -# examples/robosuite/four_table_workspace.scenic -model scenic.simulators.robosuite.model - -# CAMERA CONFIGURATION -# param camera_view = "sideview" - -TABLE_DISTANCE = 1.0 - -# WORKSPACE: Four tables in cross formation -back_table = new Table at (-TABLE_DISTANCE, 0, 0.425) -front_table = new Table at (TABLE_DISTANCE, 0, 0.425) -right_table = new Table at (0, TABLE_DISTANCE, 0.425) -left_table = new Table at (0, -TABLE_DISTANCE, 0.425) - -# TABLE 1 (Back): Two primitive objects with explicit spacing -ball = new Ball at (-TABLE_DISTANCE - 0.2, 0, 0.9), - with color (1, 0.5, 0, 1) - -box = new Box at (-TABLE_DISTANCE + 0.2, 0, 0.9), - with color (0, 0, 1, 1) - -# TABLE 2 (Front): Two nuts -square_nut = new SquareNut at (TABLE_DISTANCE - 0.2, 0, 0.9) - -round_nut = new RoundNut at (TABLE_DISTANCE + 0.2, 0, 0.9) - -# TABLE 3 (Right): Two food items -can = new Can at (0, TABLE_DISTANCE - 0.2, 0.9) - -bread = new Bread at (0, TABLE_DISTANCE + 0.2, 0.9) - -# TABLE 4 (Left): Two more objects -milk = new Milk at (0, -TABLE_DISTANCE - 0.2, 0.9) - -bottle = new Bottle at (0, -TABLE_DISTANCE + 0.2, 0.9) - -# ROBOT -ego = new Panda at (0, 0, 0) \ No newline at end of file diff --git a/examples/robosuite/full_xml_test3.scenic b/examples/robosuite/full_xml_test3.scenic index 63c40c43b..72f83ffa8 100644 --- a/examples/robosuite/full_xml_test3.scenic +++ b/examples/robosuite/full_xml_test3.scenic @@ -66,7 +66,7 @@ arena = new CustomArena, # Create a yellow ball using custom XML yellow_ball = new CustomObject at (0.5, 0.0, 1.0), - with mjcf_xml ball_object_xml + with mjcfXml ball_object_xml # Create a red box using built-in Box class for comparison red_box = new Box at (0.7, 0.2, 1.0), diff --git a/examples/robosuite/mesh_test.scenic b/examples/robosuite/mesh_test.scenic index e5a28525f..5a6494e3a 100644 --- a/examples/robosuite/mesh_test.scenic +++ b/examples/robosuite/mesh_test.scenic @@ -42,7 +42,7 @@ table = new Table on arena_floor # Create dragon - path in XML is relative to this .scenic file dragon = new CustomObject on table, - with mjcf_xml dragon_object_xml, + with mjcfXml dragon_object_xml, with color (1, 0, 1, 1) # Add a simple ball for comparison diff --git a/examples/robosuite/spatial_operators_demo.scenic b/examples/robosuite/spatial_operators_demo.scenic index 47d5caf39..cb26b62e2 100644 --- a/examples/robosuite/spatial_operators_demo.scenic +++ b/examples/robosuite/spatial_operators_demo.scenic @@ -57,7 +57,7 @@ custom_xml = """ # Yellow block at table surface yellow_block = new CustomObject at (0.5, 0.2, 0.85), - with mjcf_xml custom_xml + with mjcfXml custom_xml # Complex object (Milk carton) at table surface milk = new Milk at (0.8, -0.1, 0.85) diff --git a/examples/robosuite/stack_lift.scenic b/examples/robosuite/stack_lift.scenic index ca268d63c..c3ac17491 100644 --- a/examples/robosuite/stack_lift.scenic +++ b/examples/robosuite/stack_lift.scenic @@ -1,19 +1,18 @@ # examples/robosuite/stack_lift.scenic model scenic.simulators.robosuite.model -work_table = new Table at (0.6, 0, 0.8), +work_table = new Table on (0.6, 0, 0.8), with width 0.6, with length 1.2, with height 0.05 -bottom_cube = new Box at (0.6, 0, 0.83), +bottom_cube = new Box on work_table, with color (0.2, 0.3, 0.8, 1), with width 0.06, with length 0.06, with height 0.06 -top_cube = new Box at (0.6, 0, 0.89), +top_cube = new Box on bottom_cube, with color (0.8, 0.2, 0.2, 1) -bottle = new Bottle at (0.6, 0.3, 0.83) pickup_object = top_cube @@ -25,5 +24,7 @@ behavior StackLift(): if pickup_object.position.z > 1.0: terminate simulation wait -ego = new UR5e at (0, 0, 0), + terminate simulation + +ego = new UR5e on (0, 0, 0), with behavior StackLift() \ No newline at end of file diff --git a/src/scenic/simulators/robosuite/model.scenic b/src/scenic/simulators/robosuite/model.scenic index 599ae34c3..a60db55d2 100644 --- a/src/scenic/simulators/robosuite/model.scenic +++ b/src/scenic/simulators/robosuite/model.scenic @@ -1,6 +1,27 @@ -"""Scenic world model for RoboSuite with MJCF mesh extraction.""" +"""Scenic world model for RoboSuite robotic manipulation. + +The model supports various robots (Panda, UR5e, Jaco, IIWA), manipulable objects +(primitives and custom MJCF), and tables. It provides behaviors for pick-and-place +tasks and integrates with RoboSuite's physics simulation. + +Global Parameters: + env_config (dict): Additional RoboSuite environment configuration options. + Default is empty dict. + controller_config (dict or None): Robot controller configuration. If None, + uses RoboSuite's default controller for the robot type. Default is None. + camera_view (str or None): Camera viewpoint for rendering. Options include + 'frontview', 'birdview', 'agentview', 'sideview', 'robot0_robotview', + 'robot0_eye_in_hand'. Default is None (uses RoboSuite default). + render (bool): Whether to render the simulation visually. Default is True. + real_time (bool): Whether to run simulation in real-time. Default is True. + speed (float): Simulation speed multiplier when real_time is True. Default is 1.0. + lite_physics (bool or None): Whether to use simplified physics for faster + simulation. Default is None (uses RoboSuite default). + scenic_file_dir (Path): Directory containing the Scenic file, used for resolving + relative paths in MJCF XML files. Automatically set to localPath("."). +""" + -from .simulator import RobosuiteSimulator, SetJointPositions, OSCPositionAction from scenic.core.utils import repairMesh import trimesh import json @@ -9,6 +30,17 @@ import os from pathlib import Path import numpy as np +# At top of model.scenic +try: + from .simulator import RobosuiteSimulator, OSCPositionAction +except ImportError: + # Dummy classes for compilation without RoboSuite + class RobosuiteSimulator: + def __init__(self, **kwargs): + raise RuntimeError('RoboSuite is required to run simulations from this scenario') + + class OSCPositionAction: pass + # Global parameters param env_config = {} param controller_config = None @@ -24,9 +56,6 @@ _arena_config_path = localPath("utils/arena_meshes/arena_config.json") with open(_arena_config_path) as f: _arena_config = json.load(f) -_floor_surface_z = _arena_config['floor']['position'][2] + _arena_config['floor']['dimensions'][2] / 2 -param floor_surface_z = _floor_surface_z - # Simulator simulator RobosuiteSimulator( render=globalParameters.render, @@ -48,34 +77,10 @@ with open(_robot_dims_path) as f: dims = eval(dims_str.strip()) _robot_dimensions[robot] = dims -# Default values -DEFAULTS = { - 'object_size': 0.03, - 'density': 1000, - 'friction': (1.0, 0.005, 0.0001), - 'solref': (0.02, 1.0), - 'solimp': (0.9, 0.95, 0.001, 0.5, 2.0), - 'default_color': (0.5, 0.5, 0.5, 1.0), - - 'table_height': 0.8, - 'table_width': 0.8, - 'table_length': 0.8, - 'table_thickness': 0.05, - - 'control_gain': 3.0, - 'control_limit': 0.3, - 'position_tolerance': 0.02, - 'height_tolerance': 0.02, - 'gripper_open_steps': 20, - 'gripper_close_steps': 30, - 'max_control_steps': 100, - 'max_lift_steps': 200 -} - - -def _mjcf_to_shape(mjcf_xml: str, scenic_file_path: str = None) -> Shape: + +def _mjcf_to_shape(mjcfXml: str, scenic_file_path: str = None) -> Shape: """Convert MJCF XML to MeshShape via temp GLB file.""" - if not mjcf_xml: + if not mjcfXml: return BoxShape() import xml.etree.ElementTree as ET @@ -85,12 +90,12 @@ def _mjcf_to_shape(mjcf_xml: str, scenic_file_path: str = None) -> Shape: base_dir = scenic_file_path if scenic_file_path else os.getcwd() # Handle file path vs XML string - xml_content = mjcf_xml - if not mjcf_xml.strip().startswith('<'): - if not os.path.isabs(mjcf_xml): - xml_path = os.path.join(base_dir, mjcf_xml) + xml_content = mjcfXml + if not mjcfXml.strip().startswith('<'): + if not os.path.isabs(mjcfXml): + xml_path = os.path.join(base_dir, mjcfXml) else: - xml_path = mjcf_xml + xml_path = mjcfXml if os.path.exists(xml_path): with open(xml_path, 'r') as f: @@ -205,11 +210,10 @@ def _load_robot_mesh_shape(path): # Base classes class RoboSuiteObject(Object): """Base class for all RoboSuite objects.""" - density: DEFAULTS['density'] - friction: DEFAULTS['friction'] - solref: DEFAULTS['solref'] - solimp: DEFAULTS['solimp'] - shape: BoxShape() + density: 1000 + friction: (1.0, 0.005, 0.0001) + solref: (0.02, 1.0) + solimp: (0.9, 0.95, 0.001, 0.5, 2.0) allowCollisions: False requireVisible: False regionContainedIn: None @@ -242,156 +246,164 @@ arena_walls = new ArenaWalls on (0.5, 0.0, 0.0) class CustomArena(RoboSuiteObject): """Custom arena defined by complete MJCF XML.""" isCustomArena: True - arena_xml: "" + arenaXml: "" class Table(RoboSuiteObject): - """Table in environment.""" + """Table in environment - creates MultiTableArena. + + Note: Table dimensions have limitations due to RoboSuite's MultiTableArena: + - Height is fixed at 0.85m (full table) for Scenic collision detection + - RoboSuite only uses tabletop thickness (0.05m) internally + - Changing dimensions may cause visual inconsistencies between simulators + See table-rendering-issue.md for technical details. + """ isTable: True shape: MeshShape.fromFile(localPath("utils/table_meshes/standard_table.glb")) - width: DEFAULTS['table_width'] - length: DEFAULTS['table_length'] - height: 0.85 - position: (0, 0, 0.425) + width: 0.8 + length: 0.8 + height: 0.85 # Full table height for Scenic collision - DO NOT MODIFY + position: (0, 0, 0.425) # Center of table volume class ManipulationObject(RoboSuiteObject): """Base class for manipulable objects.""" - color: DEFAULTS['default_color'] + color: (0.5, 0.5, 0.5, 1.0) class CustomObject(ManipulationObject): """Custom object with automatic dimension extraction.""" objectType: "MJCF" - mjcf_xml: "" - shape: _mjcf_to_shape(self.mjcf_xml, globalParameters.scenic_file_dir) if self.mjcf_xml else BoxShape() + mjcfXml: "" + shape: _mjcf_to_shape(self.mjcfXml, globalParameters.scenic_file_dir) if self.mjcfXml else BoxShape() -# Primitive objects +# Primitive objects with makeRobosuiteObject methods class Box(ManipulationObject): """Box object.""" - objectType: "Box" - width: DEFAULTS['object_size'] - length: DEFAULTS['object_size'] - height: DEFAULTS['object_size'] + width: 0.03 + length: 0.03 + height: 0.03 + + def makeRobosuiteObject(self, name): + from robosuite.models.objects import BoxObject + return BoxObject( + name=name, + size=[self.width, self.length, self.height], + rgba=self.color + ) class Ball(ManipulationObject): """Ball/sphere object.""" - objectType: "Ball" - radius: DEFAULTS['object_size'] - width: DEFAULTS['object_size'] * 2 - length: DEFAULTS['object_size'] * 2 - height: DEFAULTS['object_size'] * 2 + radius: 0.03 + width: 0.06 + length: 0.06 + height: 0.06 + + def makeRobosuiteObject(self, name): + from robosuite.models.objects import BallObject + return BallObject( + name=name, + size=[self.radius], + rgba=self.color + ) class Cylinder(ManipulationObject): """Cylinder object.""" - objectType: "Cylinder" - width: DEFAULTS['object_size'] * 2 - length: DEFAULTS['object_size'] * 2 - height: DEFAULTS['object_size'] * 4 + width: 0.06 + length: 0.06 + height: 0.12 + + def makeRobosuiteObject(self, name): + from robosuite.models.objects import CylinderObject + radius = (self.width + self.length) / 4 + return CylinderObject( + name=name, + size=[radius, self.height], + rgba=self.color + ) class Capsule(ManipulationObject): """Capsule object.""" - objectType: "Capsule" - width: DEFAULTS['object_size'] * 1.5 - length: DEFAULTS['object_size'] * 1.5 - height: DEFAULTS['object_size'] * 3 - -# Complex objects -class Milk(ManipulationObject): - objectType: "Milk" - -class Cereal(ManipulationObject): - objectType: "Cereal" - -class Can(ManipulationObject): - objectType: "Can" - -class Bread(ManipulationObject): - objectType: "Bread" - -class Bottle(ManipulationObject): - objectType: "Bottle" - -class Hammer(ManipulationObject): - objectType: "Hammer" - -class SquareNut(ManipulationObject): - objectType: "SquareNut" - -class RoundNut(ManipulationObject): - objectType: "RoundNut" + width: 0.045 + length: 0.045 + height: 0.09 + + def makeRobosuiteObject(self, name): + from robosuite.models.objects import CapsuleObject + radius = (self.width + self.length) / 4 + return CapsuleObject( + name=name, + size=[radius, self.height], + rgba=self.color + ) # Robots class Robot(RoboSuiteObject): """Base robot class.""" - robot_type: "Panda" - shape: BoxShape() - gripper_type: "default" - controller_config: None - initial_qpos: None - base_type: "default" + robotType: "Panda" + gripperType: "default" + controllerConfig: None + initialQpos: None + baseType: "default" width: 0.4 length: 0.4 height: 1.0 position: (0, 0, 0) - joint_positions[dynamic]: [] - eef_pos[dynamic]: [0, 0, 0] - gripper_state[dynamic]: [0, 0] + jointPositions[dynamic]: [] + eefPos[dynamic]: [0, 0, 0] + gripperState[dynamic]: [0, 0] class Panda(Robot): - robot_type: "Panda" + robotType: "Panda" shape: _load_robot_mesh_shape(localPath("utils/robot_meshes/Panda.glb")) width: _robot_dimensions.get("Panda", [0.4, 0.4, 1.0])[0] length: _robot_dimensions.get("Panda", [0.4, 0.4, 1.0])[1] height: _robot_dimensions.get("Panda", [0.4, 0.4, 1.0])[2] - gripper_type: "PandaGripper" + gripperType: "PandaGripper" class UR5e(Robot): - robot_type: "UR5e" + robotType: "UR5e" shape: _load_robot_mesh_shape(localPath("utils/robot_meshes/UR5e.glb")) width: _robot_dimensions.get("UR5e", [0.4, 0.4, 1.0])[0] length: _robot_dimensions.get("UR5e", [0.4, 0.4, 1.0])[1] height: _robot_dimensions.get("UR5e", [0.4, 0.4, 1.0])[2] - gripper_type: "Robotiq85Gripper" + gripperType: "Robotiq85Gripper" class Jaco(Robot): - robot_type: "Jaco" + robotType: "Jaco" shape: _load_robot_mesh_shape(localPath("utils/robot_meshes/Jaco.glb")) width: _robot_dimensions.get("Jaco", [0.4, 0.4, 1.0])[0] length: _robot_dimensions.get("Jaco", [0.4, 0.4, 1.0])[1] height: _robot_dimensions.get("Jaco", [0.4, 0.4, 1.0])[2] - gripper_type: "JacoThreeFingerGripper" + gripperType: "JacoThreeFingerGripper" class IIWA(Robot): - robot_type: "IIWA" + robotType: "IIWA" shape: _load_robot_mesh_shape(localPath("utils/robot_meshes/IIWA.glb")) width: _robot_dimensions.get("IIWA", [0.4, 0.4, 1.0])[0] length: _robot_dimensions.get("IIWA", [0.4, 0.4, 1.0])[1] height: _robot_dimensions.get("IIWA", [0.4, 0.4, 1.0])[2] - gripper_type: "Robotiq140Gripper" + gripperType: "Robotiq140Gripper" # Behaviors -behavior OpenGripper(steps=DEFAULTS['gripper_open_steps']): +behavior OpenGripper(steps=20): """Open gripper over multiple steps.""" for _ in range(steps): take OSCPositionAction(gripper=-1) -behavior CloseGripper(steps=DEFAULTS['gripper_close_steps']): +behavior CloseGripper(steps=30): """Close gripper over multiple steps.""" for _ in range(steps): take OSCPositionAction(gripper=1) -behavior MoveToPosition(target_pos, - tolerance=DEFAULTS['position_tolerance'], - max_steps=DEFAULTS['max_control_steps'], - gain=DEFAULTS['control_gain']): +behavior MoveToPosition(target_pos, tolerance=0.02, max_steps=100, gain=3.0): """Move end-effector to target position.""" for _ in range(max_steps): - eef_pos = self.eef_pos + eef_pos = self.eefPos error = [target_pos[i] - eef_pos[i] for i in range(3)] if sum(e**2 for e in error)**0.5 < tolerance: return - limit = DEFAULTS['control_limit'] + limit = 0.3 delta = [max(-limit, min(limit, e * gain)) for e in error] take OSCPositionAction(position_delta=delta, gripper=-1) @@ -409,17 +421,17 @@ behavior MoveToGrasp(target_object, grasp_offset=0.02): target_object.position.z - grasp_offset] do MoveToPosition(target_pos, tolerance=0.01) -behavior LiftToHeight(target_height=1.0, max_steps=DEFAULTS['max_lift_steps']): +behavior LiftToHeight(target_height=1.0, max_steps=200): """Lift to absolute height.""" for _ in range(max_steps): - eef_pos = self.eef_pos + eef_pos = self.eefPos error = [0, 0, target_height - eef_pos[2]] - if abs(error[2]) < DEFAULTS['height_tolerance']: + if abs(error[2]) < 0.02: return - limit = DEFAULTS['control_limit'] - gain = DEFAULTS['control_gain'] + limit = 0.3 + gain = 3.0 delta = [max(-limit, min(limit, e * gain)) for e in error] take OSCPositionAction(position_delta=delta, gripper=1) diff --git a/src/scenic/simulators/robosuite/simulator.py b/src/scenic/simulators/robosuite/simulator.py index 68a6d0591..ef569c8ce 100644 --- a/src/scenic/simulators/robosuite/simulator.py +++ b/src/scenic/simulators/robosuite/simulator.py @@ -2,29 +2,11 @@ from typing import Dict, List, Any, Optional, Union import numpy as np -import mujoco import os import tempfile import shutil import xml.etree.ElementTree as ET -try: - import robosuite as suite - from robosuite.environments.manipulation.manipulation_env import ManipulationEnv - from robosuite.models.arenas import Arena, EmptyArena, MultiTableArena - from robosuite.models.tasks import ManipulationTask - from robosuite.models.objects import ( - MujocoXMLObject, - BallObject, BoxObject, CylinderObject, CapsuleObject, - MilkObject, CerealObject, CanObject, BreadObject, - HammerObject, SquareNutObject, RoundNutObject, BottleObject - ) - from robosuite.utils.observables import Observable, sensor - from robosuite.utils.mjcf_utils import xml_path_completion -except ImportError as e: - suite = None - _import_error = e - from scenic.core.simulators import Simulation, Simulator from scenic.core.vectors import Vector from scenic.core.dynamics import Action @@ -46,543 +28,544 @@ } -class CustomXMLArena(Arena): - """Arena created from custom XML string or file.""" +# Deferred class definitions to avoid importing RoboSuite at module load +def _get_arena_class(): + """Get CustomXMLArena class (requires RoboSuite imports).""" + from robosuite.models.arenas import Arena + from robosuite.utils.mjcf_utils import xml_path_completion - def __init__(self, xml_string, scenic_file_path=None): - super().__init__(xml_path_completion("arenas/empty_arena.xml")) + class CustomXMLArena(Arena): + """Arena created from custom XML string or file.""" - # Handle file path vs XML string - if not xml_string.strip().startswith('<'): - base_dir = os.path.dirname(scenic_file_path) if scenic_file_path else os.getcwd() - if not os.path.isabs(xml_string): - xml_path = os.path.join(base_dir, xml_string) - else: - xml_path = xml_string + def __init__(self, xml_string, scenic_file_path=None): + super().__init__(xml_path_completion("arenas/empty_arena.xml")) - if os.path.exists(xml_path): - with open(xml_path, 'r') as f: - xml_string = f.read() - else: - raise FileNotFoundError(f"Arena XML file not found: {xml_path}") - - # Parse custom XML - custom_tree = ET.fromstring(xml_string) - - custom_worldbody = custom_tree.find(".//worldbody") - if custom_worldbody is None: - raise ValueError("Custom XML must have a worldbody element") - - # Replace worldbody content - for child in list(self.worldbody): - self.worldbody.remove(child) - - for element in custom_worldbody: - self.worldbody.append(element) - - # Merge assets - custom_asset = custom_tree.find(".//asset") - if custom_asset is not None and self.asset is not None: - for child in custom_asset: - self.asset.append(child) - - # Set floor reference - self.floor = self.worldbody.find(".//geom[@name='floor']") - if self.floor is None: - for geom in self.worldbody.findall(".//geom"): - if geom.get("type") == "plane": - self.floor = geom - break - - # Set table references if they exist - self.table_body = self.worldbody.find(".//body[@name='custom_table']") - self.table_top = self.worldbody.find(".//geom[@name='table_top']") + # Handle file path vs XML string + if not xml_string.strip().startswith('<'): + base_dir = os.path.dirname(scenic_file_path) if scenic_file_path else os.getcwd() + if not os.path.isabs(xml_string): + xml_path = os.path.join(base_dir, xml_string) + else: + xml_path = xml_string + + if os.path.exists(xml_path): + with open(xml_path, 'r') as f: + xml_string = f.read() + else: + raise FileNotFoundError(f"Arena XML file not found: {xml_path}") + + # Parse custom XML + custom_tree = ET.fromstring(xml_string) + + custom_worldbody = custom_tree.find(".//worldbody") + if custom_worldbody is None: + raise ValueError("Custom XML must have a worldbody element") + + # Replace worldbody content + for child in list(self.worldbody): + self.worldbody.remove(child) + + for element in custom_worldbody: + self.worldbody.append(element) + + # Merge assets + custom_asset = custom_tree.find(".//asset") + if custom_asset is not None and self.asset is not None: + for child in custom_asset: + self.asset.append(child) + + # Set floor reference + self.floor = self.worldbody.find(".//geom[@name='floor']") + if self.floor is None: + for geom in self.worldbody.findall(".//geom"): + if geom.get("type") == "plane": + self.floor = geom + break + + # Set table references if they exist + self.table_body = self.worldbody.find(".//body[@name='custom_table']") + self.table_top = self.worldbody.find(".//geom[@name='table_top']") + + return CustomXMLArena -class CustomMeshObject(MujocoXMLObject): - """Custom manipulable object from XML string with automatic joint and collision handling.""" - - _existing_joint_names = set() +def _get_object_class(): + """Get CustomMeshObject class (requires RoboSuite imports).""" + from robosuite.models.objects import MujocoXMLObject - def __init__(self, name, xml_string, scenic_file_path=None): - self.object_name = name - self.temp_dir = tempfile.mkdtemp() + class CustomMeshObject(MujocoXMLObject): + """Custom manipulable object from XML string with automatic joint and collision handling.""" - base_dir = scenic_file_path if scenic_file_path else os.getcwd() + _existing_joint_names = set() - # Handle file path vs XML string - if not xml_string.strip().startswith('<'): - if not os.path.isabs(xml_string): - xml_path = os.path.join(base_dir, xml_string) - else: - xml_path = xml_string + def __init__(self, name, xml_string, scenic_file_path=None): + self.object_name = name + self.temp_dir = tempfile.mkdtemp() - if os.path.exists(xml_path): - with open(xml_path, 'r') as f: - xml_string = f.read() - base_dir = os.path.dirname(xml_path) - else: - raise FileNotFoundError(f"Object XML file not found: {xml_path}") - - tree = ET.fromstring(xml_string) - - self._process_asset_files(tree, base_dir) - tree = self._process_xml_tree(tree, name) - - fixed_xml = ET.tostring(tree, encoding='unicode') - - temp_xml_path = os.path.join(self.temp_dir, f"{name}.xml") - with open(temp_xml_path, 'w') as f: - f.write(fixed_xml) - - try: - super().__init__( - temp_xml_path, - name=name, - joints=None, - obj_type="all", - duplicate_collision_geoms=False - ) - finally: - pass # Keep temp_dir for MuJoCo to access files - - def _process_asset_files(self, tree, base_dir): - """Copy mesh and texture files to temp directory.""" - # Process mesh files - for mesh in tree.findall(".//mesh"): - file_attr = mesh.get("file") - if file_attr: - if not os.path.isabs(file_attr): - full_path = os.path.join(base_dir, file_attr) + base_dir = scenic_file_path if scenic_file_path else os.getcwd() + + # Handle file path vs XML string + if not xml_string.strip().startswith('<'): + if not os.path.isabs(xml_string): + xml_path = os.path.join(base_dir, xml_string) else: - full_path = file_attr + xml_path = xml_string - if os.path.exists(full_path): - filename = os.path.basename(full_path) - temp_path = os.path.join(self.temp_dir, filename) - shutil.copy2(full_path, temp_path) - mesh.set("file", filename) - - # Auto-copy MTL file for OBJ meshes - if full_path.endswith('.obj'): - mtl_path = full_path.replace('.obj', '.mtl') - if os.path.exists(mtl_path): - mtl_filename = os.path.basename(mtl_path) - temp_mtl_path = os.path.join(self.temp_dir, mtl_filename) - shutil.copy2(mtl_path, temp_mtl_path) - - # Process texture files - for texture in tree.findall(".//texture"): - file_attr = texture.get("file") - if file_attr and not texture.get("builtin"): - if not os.path.isabs(file_attr): - full_path = os.path.join(base_dir, file_attr) + if os.path.exists(xml_path): + with open(xml_path, 'r') as f: + xml_string = f.read() + base_dir = os.path.dirname(xml_path) else: - full_path = file_attr - - if os.path.exists(full_path): - filename = os.path.basename(full_path) - - # Convert JPG to PNG if needed - if filename.lower().endswith(('.jpg', '.jpeg')): - try: - from PIL import Image - img = Image.open(full_path) - filename = filename.rsplit('.', 1)[0] + '.png' - temp_path = os.path.join(self.temp_dir, filename) - img.save(temp_path) - except ImportError: - temp_path = os.path.join(self.temp_dir, filename) - shutil.copy2(full_path, temp_path) + raise FileNotFoundError(f"Object XML file not found: {xml_path}") + + tree = ET.fromstring(xml_string) + + self._process_asset_files(tree, base_dir) + tree = self._process_xml_tree(tree, name) + + fixed_xml = ET.tostring(tree, encoding='unicode') + + temp_xml_path = os.path.join(self.temp_dir, f"{name}.xml") + with open(temp_xml_path, 'w') as f: + f.write(fixed_xml) + + try: + super().__init__( + temp_xml_path, + name=name, + joints=None, + obj_type="all", + duplicate_collision_geoms=False + ) + finally: + pass # Keep temp_dir for MuJoCo to access files + + def _process_asset_files(self, tree, base_dir): + """Copy mesh and texture files to temp directory.""" + # Process mesh files + for mesh in tree.findall(".//mesh"): + file_attr = mesh.get("file") + if file_attr: + if not os.path.isabs(file_attr): + full_path = os.path.join(base_dir, file_attr) else: + full_path = file_attr + + if os.path.exists(full_path): + filename = os.path.basename(full_path) temp_path = os.path.join(self.temp_dir, filename) shutil.copy2(full_path, temp_path) - - texture.set("file", filename) - - def _process_xml_tree(self, tree, name): - """Ensure joint and collision geometry.""" - # Find object body - object_body = tree.find(".//body[@name='object']") - if object_body is None: - worldbody = tree.find(".//worldbody") - if worldbody is not None: - for body in worldbody.findall("body"): - inner_body = body.find("body[@name='object']") - if inner_body is not None: - object_body = inner_body - break - - if object_body is None: - return tree - - self._ensure_free_joint(object_body, name) - self._ensure_collision_geometry(object_body, name) - - return tree - - def _ensure_free_joint(self, object_body, name): - """Ensure object has a free joint for physics.""" - has_free_joint = False - existing_joint_name = None - - for joint in object_body.findall("joint"): - if joint.get("type") == "free": - has_free_joint = True - existing_joint_name = joint.get("name") - if existing_joint_name: - self._existing_joint_names.add(existing_joint_name) - self.joint_name = existing_joint_name - break - - if not has_free_joint: - # Generate unique joint name - joint_counter = 0 - while True: - joint_name = f"{name}_joint_{joint_counter}" - if joint_name not in self._existing_joint_names: - break - joint_counter += 1 - - joint = ET.Element("joint") - joint.set("name", joint_name) - joint.set("type", "free") - joint.set("damping", "0.0005") - - object_body.insert(0, joint) - self._existing_joint_names.add(joint_name) - self.joint_name = joint_name - - def _ensure_collision_geometry(self, object_body, name): - """Ensure proper collision geometry exists.""" - has_proper_collision = False - mesh_collision_geom = None - visual_geom = None - - for geom in object_body.findall("geom"): - geom_group = geom.get("group", "0") - geom_type = geom.get("type", "box") - - if geom_group == "0": - if geom_type == "mesh": - mesh_collision_geom = geom - # print(f"Found mesh collision geometry for {name} - will convert to box") - else: - has_proper_collision = True - break - elif geom_group == "1": - visual_geom = geom - - # Convert mesh collision to visual + box collision - if mesh_collision_geom is not None and not has_proper_collision: - mesh_collision_geom.set("group", "1") - mesh_collision_geom.set("contype", "0") - mesh_collision_geom.set("conaffinity", "0") + mesh.set("file", filename) + + # Auto-copy MTL file for OBJ meshes + if full_path.endswith('.obj'): + mtl_path = full_path.replace('.obj', '.mtl') + if os.path.exists(mtl_path): + mtl_filename = os.path.basename(mtl_path) + temp_mtl_path = os.path.join(self.temp_dir, mtl_filename) + shutil.copy2(mtl_path, temp_mtl_path) - collision_geom = ET.Element("geom") - collision_geom.set("name", "collision_auto") - collision_geom.set("type", "box") - collision_geom.set("size", "0.04 0.04 0.04") + # Process texture files + for texture in tree.findall(".//texture"): + file_attr = texture.get("file") + if file_attr and not texture.get("builtin"): + if not os.path.isabs(file_attr): + full_path = os.path.join(base_dir, file_attr) + else: + full_path = file_attr + + if os.path.exists(full_path): + filename = os.path.basename(full_path) + + # Convert JPG to PNG if needed + if filename.lower().endswith(('.jpg', '.jpeg')): + try: + from PIL import Image + img = Image.open(full_path) + filename = filename.rsplit('.', 1)[0] + '.png' + temp_path = os.path.join(self.temp_dir, filename) + img.save(temp_path) + except ImportError: + temp_path = os.path.join(self.temp_dir, filename) + shutil.copy2(full_path, temp_path) + else: + temp_path = os.path.join(self.temp_dir, filename) + shutil.copy2(full_path, temp_path) + + texture.set("file", filename) + + def _process_xml_tree(self, tree, name): + """Ensure joint and collision geometry.""" + # Find object body + object_body = tree.find(".//body[@name='object']") + if object_body is None: + worldbody = tree.find(".//worldbody") + if worldbody is not None: + for body in worldbody.findall("body"): + inner_body = body.find("body[@name='object']") + if inner_body is not None: + object_body = inner_body + break - pos = mesh_collision_geom.get("pos", "0 0 0") - collision_geom.set("pos", pos) + if object_body is None: + return tree - collision_geom.set("group", "0") - collision_geom.set("rgba", "0 0 0 0") + self._ensure_free_joint(object_body, name) + self._ensure_collision_geometry(object_body, name) - for prop in ["solimp", "solref", "density", "friction"]: - value = mesh_collision_geom.get(prop) - if value: - collision_geom.set(prop, value) + return tree + + def _ensure_free_joint(self, object_body, name): + """Ensure object has a free joint for physics.""" + has_free_joint = False + existing_joint_name = None - if not collision_geom.get("solimp"): - collision_geom.set("solimp", "0.998 0.998 0.001") - if not collision_geom.get("solref"): - collision_geom.set("solref", "0.001 1") - if not collision_geom.get("density"): - collision_geom.set("density", "100") - if not collision_geom.get("friction"): - collision_geom.set("friction", "0.95 0.3 0.1") + for joint in object_body.findall("joint"): + if joint.get("type") == "free": + has_free_joint = True + existing_joint_name = joint.get("name") + if existing_joint_name: + self._existing_joint_names.add(existing_joint_name) + self.joint_name = existing_joint_name + break - index = list(object_body).index(mesh_collision_geom) + 1 - object_body.insert(index, collision_geom) - # print(f"Converted mesh collision to visual + box collision for {name}") - return - - # Create collision if none exists - if not has_proper_collision: - if visual_geom is not None: - collision_geom = ET.Element("geom") - collision_geom.set("name", "collision_auto") + if not has_free_joint: + # Generate unique joint name + joint_counter = 0 + while True: + joint_name = f"{name}_joint_{joint_counter}" + if joint_name not in self._existing_joint_names: + break + joint_counter += 1 - geom_type = visual_geom.get("type", "box") - pos = visual_geom.get("pos", "0 0 0") + joint = ET.Element("joint") + joint.set("name", joint_name) + joint.set("type", "free") + joint.set("damping", "0.0005") - if geom_type == "mesh": - collision_geom.set("type", "box") - collision_geom.set("size", "0.04 0.04 0.04") - else: - collision_geom.set("type", geom_type) - size = visual_geom.get("size") - if size: - collision_geom.set("size", size) + object_body.insert(0, joint) + self._existing_joint_names.add(joint_name) + self.joint_name = joint_name + + def _ensure_collision_geometry(self, object_body, name): + """Ensure proper collision geometry exists.""" + has_proper_collision = False + mesh_collision_geom = None + visual_geom = None + + for geom in object_body.findall("geom"): + geom_group = geom.get("group", "0") + geom_type = geom.get("type", "box") - collision_geom.set("pos", pos) + if geom_group == "0": + if geom_type == "mesh": + mesh_collision_geom = geom + else: + has_proper_collision = True + break + elif geom_group == "1": + visual_geom = geom + + # Convert mesh collision to visual + box collision + if mesh_collision_geom is not None and not has_proper_collision: + mesh_collision_geom.set("group", "1") + mesh_collision_geom.set("contype", "0") + mesh_collision_geom.set("conaffinity", "0") - else: collision_geom = ET.Element("geom") - collision_geom.set("name", "collision_default") + collision_geom.set("name", "collision_auto") collision_geom.set("type", "box") collision_geom.set("size", "0.04 0.04 0.04") - collision_geom.set("pos", "0 0 0") - - collision_geom.set("group", "0") - collision_geom.set("rgba", "0 0 0 0") - collision_geom.set("solimp", "0.998 0.998 0.001") - collision_geom.set("solref", "0.001 1") - collision_geom.set("density", "100") - collision_geom.set("friction", "0.95 0.3 0.1") - - if visual_geom is not None: - index = list(object_body).index(visual_geom) + 1 + + pos = mesh_collision_geom.get("pos", "0 0 0") + collision_geom.set("pos", pos) + + collision_geom.set("group", "0") + collision_geom.set("rgba", "0 0 0 0") + + for prop in ["solimp", "solref", "density", "friction"]: + value = mesh_collision_geom.get(prop) + if value: + collision_geom.set(prop, value) + + if not collision_geom.get("solimp"): + collision_geom.set("solimp", "0.998 0.998 0.001") + if not collision_geom.get("solref"): + collision_geom.set("solref", "0.001 1") + if not collision_geom.get("density"): + collision_geom.set("density", "100") + if not collision_geom.get("friction"): + collision_geom.set("friction", "0.95 0.3 0.1") + + index = list(object_body).index(mesh_collision_geom) + 1 object_body.insert(index, collision_geom) - else: - object_body.append(collision_geom) + return - # print(f"Added collision geometry to {name}") + # Create collision if none exists + if not has_proper_collision: + if visual_geom is not None: + collision_geom = ET.Element("geom") + collision_geom.set("name", "collision_auto") + + geom_type = visual_geom.get("type", "box") + pos = visual_geom.get("pos", "0 0 0") + + if geom_type == "mesh": + collision_geom.set("type", "box") + collision_geom.set("size", "0.04 0.04 0.04") + else: + collision_geom.set("type", geom_type) + size = visual_geom.get("size") + if size: + collision_geom.set("size", size) + + collision_geom.set("pos", pos) + + else: + collision_geom = ET.Element("geom") + collision_geom.set("name", "collision_default") + collision_geom.set("type", "box") + collision_geom.set("size", "0.04 0.04 0.04") + collision_geom.set("pos", "0 0 0") + + collision_geom.set("group", "0") + collision_geom.set("rgba", "0 0 0 0") + collision_geom.set("solimp", "0.998 0.998 0.001") + collision_geom.set("solref", "0.001 1") + collision_geom.set("density", "100") + collision_geom.set("friction", "0.95 0.3 0.1") + + if visual_geom is not None: + index = list(object_body).index(visual_geom) + 1 + object_body.insert(index, collision_geom) + else: + object_body.append(collision_geom) + + def __del__(self): + """Clean up temp directory.""" + if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) - def __del__(self): - """Clean up temp directory.""" - if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir): - shutil.rmtree(self.temp_dir) + return CustomMeshObject -def _extract_cylinder_size(config: Dict) -> List[float]: - """Extract [radius, height] for cylinder-like objects.""" - if 'radius' in config and 'height' in config: - return [config['radius'], config['height']] - - size = config.get('size', [0.02, 0.02, 0.04]) - if len(size) == 3: - radius = (size[0] + size[1]) / 4 - return [radius, size[2]] - elif len(size) == 2: - return size - return [0.02, 0.04] - - -# Object type mapping for built-in RoboSuite objects -OBJECT_FACTORIES = { - 'Ball': lambda cfg: BallObject( - name=cfg['name'], - size=[cfg.get('radius', 0.02)], - rgba=cfg.get('color', [1, 0, 0, 1]) - ), - 'Box': lambda cfg: BoxObject( - name=cfg['name'], - size=cfg.get('size', [0.025, 0.025, 0.025]), - rgba=cfg.get('color', [1, 0, 0, 1]) - ), - 'Cylinder': lambda cfg: CylinderObject( - name=cfg['name'], - size=_extract_cylinder_size(cfg), - rgba=cfg.get('color', [1, 0, 0, 1]) - ), - 'Capsule': lambda cfg: CapsuleObject( - name=cfg['name'], - size=_extract_cylinder_size(cfg), - rgba=cfg.get('color', [1, 0, 0, 1]) - ), - 'Milk': lambda cfg: MilkObject(name=cfg['name']), - 'Cereal': lambda cfg: CerealObject(name=cfg['name']), - 'Can': lambda cfg: CanObject(name=cfg['name']), - 'Bread': lambda cfg: BreadObject(name=cfg['name']), - 'Hammer': lambda cfg: HammerObject(name=cfg['name']), - 'SquareNut': lambda cfg: SquareNutObject(name=cfg['name']), - 'RoundNut': lambda cfg: RoundNutObject(name=cfg['name']), - 'Bottle': lambda cfg: BottleObject(name=cfg['name']) -} - - -class ScenicManipulationEnv(ManipulationEnv): - """Scenic-driven manipulation environment.""" - - def __init__(self, scenic_config: Dict, **kwargs): - self.scenic_config = scenic_config - self.scenic_objects = scenic_config.get('objects', []) - self.scenic_tables = scenic_config.get('tables', []) - self.scenic_custom_arena = scenic_config.get('custom_arena', None) - super().__init__(**kwargs) - - def _load_model(self): - """Load models and create arena.""" - super()._load_model() - - self._position_robots() - mujoco_arena = self._create_arena() - mujoco_arena.set_origin([0, 0, 0]) - - self.mujoco_objects = self._create_objects() - - self.model = ManipulationTask( - mujoco_arena=mujoco_arena, - mujoco_robots=[robot.robot_model for robot in self.robots], - mujoco_objects=self.mujoco_objects - ) +def _get_env_class(): + """Get ScenicManipulationEnv class (requires RoboSuite imports).""" + from robosuite.environments.manipulation.manipulation_env import ManipulationEnv + from robosuite.models.arenas import EmptyArena, MultiTableArena + from robosuite.models.tasks import ManipulationTask + from robosuite.utils.observables import Observable, sensor - def _position_robots(self): - """Position robots based on scenic configuration.""" - for i, robot_config in enumerate(self.scenic_config.get('robots', [])): - if i < len(self.robots): - pos = robot_config.get('position', [0, 0, 0]) - self.robots[i].robot_model.set_base_xpos(pos) - - def _create_arena(self): - """Create arena based on configuration.""" - if self.scenic_custom_arena: - xml_string = self.scenic_custom_arena.get('xml', '') - scenic_file_path = self.scenic_custom_arena.get('scenic_file_path', None) - # print(f"DEBUG: Creating custom arena with scenic_file_path: {scenic_file_path}") - return CustomXMLArena(xml_string, scenic_file_path) - - if self.scenic_tables: - table_offsets = [] - for t in self.scenic_tables: - pos = t.get('position', [0, 0, 0.8]) - table_offsets.append([pos[0], pos[1], 0.8]) # Standard table height - - return MultiTableArena( - table_offsets=table_offsets, - table_full_sizes=[t.get('size', (0.8, 0.8, 0.05)) for t in self.scenic_tables], - has_legs=[True] * len(self.scenic_tables) - ) + class ScenicManipulationEnv(ManipulationEnv): + """Scenic-driven manipulation environment.""" - return EmptyArena() - - def _create_objects(self) -> List: - """Create Robosuite objects from Scenic configuration.""" - mujoco_objects = [] + def __init__(self, scenic_sim, **kwargs): + self.scenic_sim = scenic_sim + super().__init__(**kwargs) - for i, obj_config in enumerate(self.scenic_objects): - obj_type = obj_config['type'] - obj_name = obj_config.get('name', f'unnamed_{i}') + def _load_model(self): + """Load models and create arena.""" + super()._load_model() - if obj_type == 'MJCF': - scenic_file_path = obj_config.get('scenic_file_path', None) - - mj_obj = CustomMeshObject( - name=obj_name, - xml_string=obj_config.get('mjcf_xml', ''), - scenic_file_path=scenic_file_path - ) - else: - factory = OBJECT_FACTORIES.get(obj_type, OBJECT_FACTORIES['Box']) - mj_obj = factory(obj_config) + # Position robots directly from scenic_sim + for i, robot in enumerate(self.scenic_sim.robots): + if i < len(self.robots): + pos = [robot.position.x, robot.position.y, 0] + self.robots[i].robot_model.set_base_xpos(pos) - mujoco_objects.append(mj_obj) - - return mujoco_objects - - def _setup_references(self): - """Setup references to simulation objects.""" - super()._setup_references() - self.obj_body_ids = {} + mujoco_arena = self._create_arena() + mujoco_arena.set_origin([0, 0, 0]) + + self.mujoco_objects = self._create_objects() + + self.model = ManipulationTask( + mujoco_arena=mujoco_arena, + mujoco_robots=[robot.robot_model for robot in self.robots], + mujoco_objects=self.mujoco_objects + ) - for obj in self.mujoco_objects: - try: - if hasattr(obj, 'root_body'): - body_name = obj.root_body - else: - body_name = obj.name - - body_id = self.sim.model.body_name2id(body_name) - self.obj_body_ids[obj.name] = body_id - except: - for suffix in ['_main', '_body', '']: - try: - body_name = f"{obj.name}{suffix}" - body_id = self.sim.model.body_name2id(body_name) - self.obj_body_ids[obj.name] = body_id - break - except: - continue - - def _setup_observables(self) -> Dict: - """Setup observables for objects.""" - observables = super()._setup_observables() - - for obj in self.mujoco_objects: - if obj.name in self.obj_body_ids: - @sensor(modality="object") - def obj_pos(obs_cache, name=obj.name): - return np.array(self.sim.data.body_xpos[self.obj_body_ids[name]]) + def _create_arena(self): + """Create arena based on scenic objects.""" + # Check for custom arena + for obj in self.scenic_sim.objects: + if hasattr(obj, 'isCustomArena') and obj.isCustomArena: + xml_string = getattr(obj, 'arenaXml', '') + scenic_file_path = str(self.scenic_sim.scene.params.get('scenic_file_dir', '')) + return CustomXMLArena(xml_string, scenic_file_path) + + # Check for tables + tables = [obj for obj in self.scenic_sim.objects + if hasattr(obj, 'isTable') and obj.isTable] + + if tables: + table_offsets = [] + for t in tables: + table_offsets.append([t.position.x, t.position.y, 0.8]) - obj_pos.__name__ = f"{obj.name}_pos" - observables[obj_pos.__name__] = Observable( - name=obj_pos.__name__, - sensor=obj_pos, - sampling_rate=self.control_freq + return MultiTableArena( + table_offsets=table_offsets, + table_full_sizes=[(t.width, t.length, 0.05) for t in tables], + has_legs=[True] * len(tables) ) + + return EmptyArena() - return observables - - def _reset_internal(self): - """Reset environment internals.""" - super()._reset_internal() - - for obj_config, mj_obj in zip(self.scenic_objects, self.mujoco_objects): - if 'position' in obj_config: - pos = obj_config['position'] - quat = obj_config.get('quaternion', [1, 0, 0, 0]) - - joint_name = None - possible_names = [] - - if hasattr(mj_obj, 'joint_name'): - base_joint = mj_obj.joint_name - possible_names.append(f"{mj_obj.name}_{base_joint}") - possible_names.append(base_joint) - - if hasattr(mj_obj, 'joints') and mj_obj.joints: - possible_names.extend(mj_obj.joints) + def _create_objects(self) -> List: + """Create Robosuite objects from Scenic objects.""" + mujoco_objects = [] + scenic_file_path = str(self.scenic_sim.scene.params.get('scenic_file_dir', '')) + + for obj in self.scenic_sim.objects: + # Skip non-manipulable objects + if (hasattr(obj, '_isArenaComponent') and obj._isArenaComponent) or \ + hasattr(obj, 'robotType') or \ + hasattr(obj, 'isCustomArena') or \ + hasattr(obj, 'isTable'): + continue - possible_names.append(f"{mj_obj.name}_joint0") - possible_names.append(f"{mj_obj.name}_object_joint") + # Generate unique name + scenic_name = getattr(obj, 'name', None) + if hasattr(obj, 'objectType') and obj.objectType == 'MJCF': + obj_name = f"{scenic_name}_{len(mujoco_objects)}" if scenic_name else f"mjcf_object_{len(mujoco_objects)}" + mj_obj = CustomMeshObject( + name=obj_name, + xml_string=getattr(obj, 'mjcfXml', ''), + scenic_file_path=scenic_file_path + ) + elif hasattr(obj, 'makeRobosuiteObject'): + obj_name = f"{scenic_name}_{len(mujoco_objects)}" if scenic_name else f"{obj.__class__.__name__}_{len(mujoco_objects)}" + mj_obj = obj.makeRobosuiteObject(obj_name) + else: + continue - for name in possible_names: - if name in self.sim.model.joint_names: - joint_name = name + # Store mapping + self.scenic_sim.object_name_map[obj] = obj_name + mujoco_objects.append(mj_obj) + + return mujoco_objects + + def _reset_internal(self): + """Reset environment internals.""" + super()._reset_internal() + + # Set initial positions + for obj in self.scenic_sim.objects: + obj_name = self.scenic_sim.object_name_map.get(obj) + if not obj_name: + continue + + # Find corresponding mujoco object + mj_obj = None + for mobj in self.mujoco_objects: + if mobj.name == obj_name: + mj_obj = mobj break - if joint_name: - self.sim.data.set_joint_qpos( - joint_name, - np.concatenate([np.array(pos), np.array(quat)]) - ) - # else: - # print(f"Warning: Could not find joint for {mj_obj.name}") - - self.sim.forward() - self.sim.step() - - def reward(self, action=None) -> float: - """Compute reward.""" - return 0.0 - - def _check_success(self) -> bool: - """Check task success.""" - return False + if mj_obj: + pos = [obj.position.x, obj.position.y, obj.position.z] + quat = [1, 0, 0, 0] # Default quaternion + + # Find joint name + joint_name = None + possible_names = [] + + if hasattr(mj_obj, 'joint_name'): + base_joint = mj_obj.joint_name + possible_names.append(f"{mj_obj.name}_{base_joint}") + possible_names.append(base_joint) + + if hasattr(mj_obj, 'joints') and mj_obj.joints: + possible_names.extend(mj_obj.joints) + + possible_names.append(f"{mj_obj.name}_joint0") + possible_names.append(f"{mj_obj.name}_object_joint") + + for name in possible_names: + if name in self.sim.model.joint_names: + joint_name = name + break + + if joint_name: + self.sim.data.set_joint_qpos( + joint_name, + np.concatenate([np.array(pos), np.array(quat)]) + ) + + self.sim.forward() + self.sim.step() + + def _setup_references(self): + """Setup references to simulation objects.""" + super()._setup_references() + # Let scenic_sim handle body mapping + for obj in self.mujoco_objects: + try: + if hasattr(obj, 'root_body'): + body_name = obj.root_body + else: + body_name = obj.name + + body_id = self.sim.model.body_name2id(body_name) + self.scenic_sim.body_id_map[obj.name] = body_id + except: + for suffix in ['_main', '_body', '']: + try: + body_name = f"{obj.name}{suffix}" + body_id = self.sim.model.body_name2id(body_name) + self.scenic_sim.body_id_map[obj.name] = body_id + break + except: + continue + + def reward(self, action=None) -> float: + """Compute reward.""" + return 0.0 + + def _check_success(self) -> bool: + """Check task success.""" + return False + + return ScenicManipulationEnv class RobosuiteSimulator(Simulator): - """Simulator for RoboSuite custom environments.""" - + + """Simulator for RoboSuite robotic manipulation environments. + + Args: + render: Whether to render the simulation visually. If True, opens a + viewer window showing the simulation. Default True. + real_time: Whether to run the simulation in real-time. If False, runs + as fast as possible. Default True. + speed: Speed multiplier for real-time simulation. Values > 1.0 speed up + the simulation, < 1.0 slow it down. Only used when real_time=True. + Default 1.0. + env_config: Additional configuration passed to RoboSuite environment. + Can include settings like 'control_freq', 'horizon', etc. Default + empty dict. + controller_config: Robot controller configuration. If None, uses + RoboSuite's default controller for the robot type. Can specify + controller type and parameters. Default None. + camera_view: Name of camera to use for rendering. Options include + 'frontview', 'birdview', 'agentview', 'sideview', 'robot0_robotview', + 'robot0_eye_in_hand'. Default None (uses RoboSuite default). + lite_physics: Whether to use simplified physics for faster simulation. + Reduces physics accuracy but improves performance. Default None + (uses RoboSuite default). + """ + def __init__(self, render: bool = True, real_time: bool = True, speed: float = 1.0, env_config: Optional[Dict] = None, controller_config: Optional[Dict] = None, camera_view: Optional[str] = None, lite_physics: Optional[bool] = None): super().__init__() - if suite is None: - raise RuntimeError(f"Unable to import RoboSuite: {_import_error}") + + # Import RoboSuite only when creating simulator + try: + global mujoco, suite + import mujoco + import robosuite as suite + except ImportError as e: + raise RuntimeError(f"Unable to import RoboSuite: {e}") self.render = render self.real_time = real_time @@ -607,6 +590,13 @@ class RobosuiteSimulation(Simulation): def __init__(self, scene, render: bool, real_time: bool, speed: float, env_config: Optional[Dict], controller_config: Optional[Dict], camera_view: Optional[str], lite_physics: Optional[bool], **kwargs): + + # Load deferred classes + global CustomXMLArena, CustomMeshObject, ScenicManipulationEnv + CustomXMLArena = _get_arena_class() + CustomMeshObject = _get_object_class() + ScenicManipulationEnv = _get_env_class() + self.render = render self.real_time = real_time self.speed = speed @@ -636,19 +626,25 @@ def __init__(self, scene, render: bool, real_time: bool, speed: float, super().__init__(scene, **kwargs) + def setup(self): """Initialize the RoboSuite environment.""" super().setup() - scenic_config = self._extract_scenic_config() + # Extract robots for environment + robot_types = [] + for obj in self.objects: + if hasattr(obj, 'robotType'): + robot_types.append(obj.robotType) + self.robots.append(obj) - if not scenic_config['robots']: + if not robot_types: raise ValueError("At least one robot is required in the scene") - robot_arg = self._get_robot_arg(scenic_config['robots']) + robot_arg = robot_types[0] if len(robot_types) == 1 else robot_types env_kwargs = { - 'scenic_config': scenic_config, + 'scenic_sim': self, # Pass self directly 'robots': robot_arg, 'has_renderer': self.render, 'render_camera': self.camera_view, @@ -662,10 +658,11 @@ def setup(self): self.robosuite_env = ScenicManipulationEnv(**env_kwargs) self._current_obs = self.robosuite_env.reset() - self._detect_controller_type(scenic_config['robots']) + self._detect_controller_type() self._finalize_setup() - + + def _finalize_setup(self): """Common setup after environment creation.""" self.model = self.robosuite_env.sim.model._model @@ -693,14 +690,11 @@ def _extract_scenic_config(self) -> Dict: scenic_file_path = None - # Check if user explicitly set scenic_file_dir param if 'scenic_file_dir' in self.scene.params: path_obj = self.scene.params.get('scenic_file_dir') if path_obj: scenic_file_path = str(path_obj) - # print(f"Found scenic_file_dir from params: {scenic_file_path}") - # Try to get from command line arguments if not scenic_file_path: import sys for arg in sys.argv: @@ -708,32 +702,25 @@ def _extract_scenic_config(self) -> Dict: scenic_file_abspath = os.path.abspath(arg) if os.path.exists(scenic_file_abspath): scenic_file_path = os.path.dirname(scenic_file_abspath) - # print(f"Found scenic file path from sys.argv: {scenic_file_path}") break - # Fallback to current directory if not scenic_file_path: scenic_file_path = os.getcwd() - # print(f"WARNING: Could not determine Scenic file directory") - # print(f" Add to your .scenic file: param scenic_file_dir = localPath(\".\")") - # print(f"DEBUG: Final scenic file directory: {scenic_file_path}") - - # Process all objects in the scene for obj in self.objects: if hasattr(obj, '_isArenaComponent') and obj._isArenaComponent: continue - if hasattr(obj, 'robot_type'): + if hasattr(obj, 'robotType'): self._add_robot_config(config['robots'], obj) elif hasattr(obj, 'isCustomArena') and obj.isCustomArena: config['custom_arena'] = { - 'xml': getattr(obj, 'arena_xml', ''), + 'xml': getattr(obj, 'arenaXml', ''), 'scenic_file_path': scenic_file_path } elif hasattr(obj, 'isTable') and obj.isTable: self._add_table_config(config['tables'], obj) - elif hasattr(obj, 'objectType'): + elif hasattr(obj, 'makeRobosuiteObject') or (hasattr(obj, 'objectType') and obj.objectType == 'MJCF'): self._add_object_config(config['objects'], obj, scenic_file_path) return config @@ -741,44 +728,43 @@ def _extract_scenic_config(self) -> Dict: def _add_robot_config(self, robots: List, obj): """Add robot configuration.""" robots.append({ - 'type': obj.robot_type, + 'type': obj.robotType, 'position': [obj.position.x, obj.position.y, 0] }) self.robots.append(obj) - + def _add_table_config(self, tables: List, obj): """Add table configuration.""" tables.append({ 'position': [obj.position.x, obj.position.y, obj.position.z], - 'size': (obj.width, obj.length, obj.height) + 'size': (obj.width, obj.length, 0.05) # MultiTableArena tabletop thickness }) def _add_object_config(self, objects: List, obj, scenic_file_path): """Add object configuration.""" scenic_name = getattr(obj, 'name', None) - if obj.objectType == 'MJCF': + if hasattr(obj, 'objectType') and obj.objectType == 'MJCF': obj_name = f"{scenic_name}_{len(objects)}" if scenic_name else f"mjcf_object_{len(objects)}" + obj_type = 'MJCF' else: - base_name = scenic_name if scenic_name else obj.objectType + base_name = scenic_name if scenic_name else obj.__class__.__name__ obj_name = f"{base_name}_{len(objects)}" + obj_type = obj.__class__.__name__ config = { - 'type': obj.objectType, + 'type': obj_type, 'name': obj_name, 'position': [obj.position.x, obj.position.y, obj.position.z], - 'color': getattr(obj, 'color', [1, 0, 0, 1]) + 'color': getattr(obj, 'color', [1, 0, 0, 1]), + 'scenic_obj': obj # Store reference to the Scenic object } self.object_name_map[obj] = obj_name - if obj.objectType == 'MJCF': - config['mjcf_xml'] = getattr(obj, 'mjcf_xml', '') + if obj_type == 'MJCF': + config['mjcfXml'] = getattr(obj, 'mjcfXml', '') config['scenic_file_path'] = scenic_file_path - elif obj.objectType == 'Ball' and hasattr(obj, 'radius'): - config['radius'] = obj.radius - elif obj.objectType in ['Box', 'Cylinder', 'Capsule'] and hasattr(obj, 'width'): - config['size'] = [obj.width, obj.length, obj.height] objects.append(config) @@ -789,9 +775,9 @@ def _get_robot_arg(self, robots: List) -> Union[str, List[str]]: return ([r['type'] for r in robots] if len(robots) > 1 else robots[0]['type']) - def _detect_controller_type(self, robot_configs: List): + def _detect_controller_type(self): """Detect controller type from first robot.""" - if robot_configs and self.robosuite_env.robots: + if self.robots and self.robosuite_env.robots: first_robot = self.robosuite_env.robots[0] if hasattr(first_robot, 'controller'): self.controller_type = type(first_robot.controller).__name__ @@ -829,16 +815,8 @@ def createObjectInSimulator(self, obj): def executeActions(self, allActions: Dict[Any, List]) -> None: """Execute actions by calling their applyTo methods.""" + self.pending_robot_action = None super().executeActions(allActions) - - self.pending_robot_action = None - - for agent in self.agents: - if agent in allActions and allActions[agent]: - for action in allActions[agent]: - if action and hasattr(action, 'applyTo'): - action.applyTo(agent, self) - break def step(self): """Step the simulation forward one timestep.""" @@ -870,11 +848,11 @@ def getProperties(self, obj, properties: List[str]) -> Dict[str, Any]: for prop in properties: if prop == 'position': values[prop] = self._get_object_position(obj) - elif prop == 'joint_positions' and robot_idx is not None: + elif prop == 'jointPositions' and robot_idx is not None: values[prop] = self._get_robot_joints(robot_idx) - elif prop == 'eef_pos' and robot_idx is not None: + elif prop == 'eefPos' and robot_idx is not None: values[prop] = self._get_eef_position(robot_idx) - elif prop == 'gripper_state' and robot_idx is not None: + elif prop == 'gripperState' and robot_idx is not None: values[prop] = self._get_gripper_state(robot_idx) else: values[prop] = getattr(obj, prop, None) @@ -882,19 +860,14 @@ def getProperties(self, obj, properties: List[str]) -> Dict[str, Any]: return values def _get_object_position(self, obj) -> Vector: - """Get object position.""" obj_name = self.object_name_map.get(obj) - if obj_name and self._current_obs: - obs_key = f"{obj_name}_pos" - if obs_key in self._current_obs: - pos = self._current_obs[obs_key] - return Vector(pos[0], pos[1], pos[2]) - elif obj_name in self.body_id_map: - body_id = self.body_id_map[obj_name] - pos = self.data.xpos[body_id] - return Vector(pos[0], pos[1], pos[2]) + if obj_name and obj_name in self.body_id_map: + body_id = self.body_id_map[obj_name] + pos = self.data.xpos[body_id] + return Vector(pos[0], pos[1], pos[2]) return obj.position - + + def _get_robot_joints(self, robot_idx: int) -> List: """Get robot joint positions.""" if robot_idx < len(self.robosuite_env.robots): @@ -917,11 +890,11 @@ def getCurrentObservation(self) -> Optional[Dict]: """Get current observation dictionary.""" return self._current_obs - def checkSuccess(self) -> bool: - """Check if task is successfully completed.""" - if hasattr(self.robosuite_env, '_check_success'): - return self.robosuite_env._check_success() - return False + # def checkSuccess(self) -> bool: + # """Check if task is successfully completed.""" + # if hasattr(self.robosuite_env, '_check_success'): + # return self.robosuite_env._check_success() + # return False def destroy(self): """Clean up simulation resources.""" @@ -930,24 +903,34 @@ def destroy(self): self.robosuite_env = None -# Actions -class SetJointPositions(Action): - """Set robot joint positions.""" +# Controller Interface +class OSCPositionAction(Action): + """Operational Space Control for end-effector.""" + + def __init__(self, position_delta: Optional[List[float]] = None, + orientation_delta: Optional[List[float]] = None, + gripper: Optional[float] = None): + self.position_delta = position_delta or [0, 0, 0] + self.orientation_delta = orientation_delta or [0, 0, 0] + self.gripper = gripper if gripper is not None else 0 - def __init__(self, positions: List[float]): - self.positions = positions + def canBeTakenBy(self, agent) -> bool: + """Only robots can take this action.""" + return hasattr(agent, 'robotType') def applyTo(self, agent, sim: RobosuiteSimulation): """Apply action to agent.""" - if hasattr(sim, 'robots') and agent in sim.robots: - robot_idx = sim.robots.index(agent) - if robot_idx < len(sim.robosuite_env.robots): - sim.pending_robot_action = np.array(self.positions) - - -class OSCPositionAction(Action): + robot_idx = sim.robots.index(agent) + + # OSC control always uses 7D action vector + action = np.zeros(7) + action[:3] = self.position_delta + action[3:6] = self.orientation_delta + action[6] = self.gripper + + sim.pending_robot_action = action """Operational Space Control for end-effector.""" - + def __init__(self, position_delta: Optional[List[float]] = None, orientation_delta: Optional[List[float]] = None, gripper: Optional[float] = None): @@ -955,18 +938,21 @@ def __init__(self, position_delta: Optional[List[float]] = None, self.orientation_delta = orientation_delta or [0, 0, 0] self.gripper = gripper if gripper is not None else 0 + def canBeTakenBy(self, agent) -> bool: + """Only robots can take this action.""" + return hasattr(agent, 'robotType') + def applyTo(self, agent, sim: RobosuiteSimulation): """Apply action to agent.""" - if hasattr(sim, 'robots') and agent in sim.robots: - robot_idx = sim.robots.index(agent) - if robot_idx < len(sim.robosuite_env.robots): - if sim.controller_type == 'JOINT_POSITION': - action = np.zeros(sim.action_dim) - action[:3] = self.position_delta - else: - action = np.zeros(7) - action[:3] = self.position_delta - action[3:6] = self.orientation_delta - action[6] = self.gripper - - sim.pending_robot_action = action \ No newline at end of file + robot_idx = sim.robots.index(agent) + + if sim.controller_type == 'JOINT_POSITION': + action = np.zeros(sim.action_dim) + action[:3] = self.position_delta + else: + action = np.zeros(7) + action[:3] = self.position_delta + action[3:6] = self.orientation_delta + action[6] = self.gripper + + sim.pending_robot_action = action \ No newline at end of file