Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions agent.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Agent Notes: Adding a New Robot

This file is intentionally brief. For detailed implementation info, see existing project docs and code.

## Before You Start

Read these first for required context:

- [CONTRIBUTING.md](CONTRIBUTING.md) — Project conventions, control flow, unit standards
- [README.md](README.md) — Public API style and usage patterns

## Implementation Steps

1. Use the blank robot template (`armctl/_blank/robot.py`) as your structural baseline
2. Choose communication layer: SocketController, SerialController, or PLCController from the templates directory
3. Ensure all abstract methods and properties from inherited base classes (Commands, Properties, and communication layer) are fully implemented
4. Study existing robot implementations in the `armctl/` directory for API consistency patterns
5. Keep conversions and checks aligned with project-wide conventions from CONTRIBUTING.md
6. Add/update tests for your integration

## Key Decisions

**Dependencies:** Prefer official first-party robot SDKs when available and stable. Use custom implementation if no suitable first-party option exists.

**Documentation:** As you discover manufacturer documentation/specs/resources, add links to the robot's folder README or inline code comments. Keep it minimal—only include links directly relevant to implementation and maintenance.
2 changes: 2 additions & 0 deletions armctl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

# from .fanuc import Fanuc
from .vention import Vention
from .zaber import Zaber

__all__ = [
"ElephantRobotics",
Expand All @@ -33,6 +34,7 @@
"UR16",
"Vention",
"Jaka",
"Zaber",
]

__version__ = "0.3.4"
Expand Down
5 changes: 5 additions & 0 deletions armctl/_blank/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Blank Robot Template

Template for new robot implementations. Add discovered resources and documentation links below.

## Resources
6 changes: 6 additions & 0 deletions armctl/zaber/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Zaber

ASCII Protocol Manual: [Here](https://www.zaber.com/protocol-manual)
Python API Reference: [Here](https://software.zaber.com/motion-library/api/py)

Python Examples: [Here](https://github.com/zabertech/zaber-examples/tree/main/examples)
1 change: 1 addition & 0 deletions armctl/zaber/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .zaber import Zaber
231 changes: 231 additions & 0 deletions armctl/zaber/zaber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
from __future__ import annotations

import time
from zaber_motion import ascii, Units

from armctl.templates import Commands, Properties
from armctl.utils import CommandCheck as cc
from armctl.templates.logger import logger


class Zaber(Commands, Properties):
"""
Zaber motion controller for linear actuators.

Uses the official zaber-motion library for ASCII protocol communication.
Supports multi-axis linear motion systems via TCP connection.

Reference:
- ASCII Protocol: https://www.zaber.com/protocol-manual
- Python API: https://software.zaber.com/motion-library/api/py
- Examples: https://github.com/zabertech/zaber-examples
"""

def __init__(
self,
ip: str,
port: int = 55550,
device_address: int = 1,
num_axes: int = 1,
):
"""
Initialize Zaber device connection parameters.

Args:
ip: IP address of Zaber device or network interface
port: TCP port (default: 55550)
device_address: Device address on the bus (default: 1)
num_axes: Number of axes on this device (auto-detected if not set)
"""
self.ip = ip
self.port = port
self.device_address = device_address
self.num_axes = num_axes

self.connection = None
self.device = None
self.axes = []

# Define Properties - for linear systems, ranges are in meters
# Default: single axis with typical range
self.JOINT_RANGES: list[tuple[float, float]] = [
(0.0, 0.3)
for _ in range(num_axes) # 0-300mm typical range
]
self.MAX_JOINT_VELOCITY: float | None = 10.0 # m/s
self.MAX_JOINT_ACCELERATION: float | None = None

self.units = Units.LENGTH_METRES

def connect(self) -> None:
"""Establish connection to Zaber device via TCP."""
try:
logger.info(f"Connecting to Zaber at {self.ip}:{self.port}")
# Open TCP connection using first-party zaber-motion library
self.connection = ascii.Connection.open_tcp(self.ip, self.port)

# Get the device
self.device = self.connection.get_device(self.device_address)
logger.info(f"Connected to device at address {self.device_address}")

# Identify and cache available axes
identity = self.device.identify()
self.num_axes = identity.axis_count
logger.info(f"Device has {self.num_axes} axes")

# Get axis objects for later use
self.axes = [
self.device.get_axis(i + 1) for i in range(self.num_axes)
]

except Exception as e:
logger.error(f"Failed to connect to Zaber: {e}")
raise

def disconnect(self) -> None:
"""Close connection to Zaber device."""
try:
if self.connection:
self.connection.close()
logger.info("Disconnected from Zaber device")
except Exception as e:
logger.warning(f"Error during disconnect: {e}")

def __enter__(self):
"""Context manager entry."""
self.connect()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.disconnect()
return False

def sleep(self, seconds: float) -> None:
"""Sleep for the specified number of seconds."""
cc.sleep(seconds)
time.sleep(seconds)

def move_joints(self, pos: list[float]) -> None:
"""
Move axes to specified positions in meters.

Args:
pos: List of positions in meters (one per axis)
"""
cc.move_joints(pos)

try:
if len(pos) != len(self.axes):
raise ValueError(
f"Expected {len(self.axes)} positions, got {len(pos)}"
)

# Move each axis to target position
for i, axis in enumerate(self.axes):
target = pos[i]
logger.debug(f"Moving axis {i + 1} to {target} m")
axis.move_absolute(target, unit=self.units)

# Wait for all axes to complete movement
for axis in self.axes:
axis.wait_until_idle()

except Exception as e:
logger.error(f"Failed to move joints: {e}")
raise

def get_joint_positions(self) -> list[float]:
"""Get current position of all axes in meters."""
cc.get_joint_positions()

try:
positions = []
for i, axis in enumerate(self.axes):
pos = axis.get_position(unit=self.units)
positions.append(pos)
logger.debug(f"Axis {i + 1} position: {pos} m")
return positions

except Exception as e:
logger.error(f"Failed to get joint positions: {e}")
raise

def move_cartesian(self, pose: list[float]) -> None:
"""
Move to cartesian position.

For linear systems, this is equivalent to move_joints.

Args:
pose: List of positions in meters [x, y, z, ...] or applicable axes
"""
cc.move_cartesian(pose)
# For linear systems, cartesian movement is same as joint movement
self.move_joints(pose[: len(self.axes)])

def get_cartesian_position(self) -> list[float]:
"""
Get current cartesian position.

For linear systems, this is equivalent to get_joint_positions.
"""
cc.get_cartesian_position()
return self.get_joint_positions()

def stop_motion(self) -> None:
"""Stop all axes immediately."""
cc.stop_motion()

try:
logger.info("Stopping all axes")
for i, axis in enumerate(self.axes):
axis.stop()

except Exception as e:
logger.error(f"Failed to stop motion: {e}")
raise

def get_robot_state(self) -> dict:
"""Get current device and axis states."""
cc.get_robot_state()

try:
state = {
"device_address": self.device_address,
"num_axes": self.num_axes,
"axes": [],
}

for i, axis in enumerate(self.axes):
axis_state = axis.get_state()
state["axes"].append(
{
"axis": i + 1,
"is_moving": axis.is_busy(),
"is_homed": axis.is_homed(),
"state": str(axis_state),
}
)

return state

except Exception as e:
logger.error(f"Failed to get robot state: {e}")
raise

def home(self) -> None:
"""Home all axes to their home position."""
try:
logger.info("Homing all axes")
for i, axis in enumerate(self.axes):
axis.home()
logger.debug(f"Homing axis {i + 1}")

# Wait for all axes to complete homing
for axis in self.axes:
axis.wait_until_idle()

except Exception as e:
logger.error(f"Failed to home axes: {e}")
raise
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies = [
"urrtde",
"typer>=0.9.0",
"pyserial>=3.5",
"zaber-motion>=2.13.1",
]

[project.urls]
Expand Down
Loading