From 40da62343bf9a03a1d5344903418c0d4db4433a5 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Sat, 7 Feb 2026 12:53:46 -0800 Subject: [PATCH 1/9] TF support on manipulation module and Object Input topic support --- dimos/manipulation/manipulation_module.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index 33dea3369..abab90dc9 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -47,6 +47,7 @@ if TYPE_CHECKING: from dimos.core.rpc_client import RPCClient from dimos.msgs.geometry_msgs import Pose + from dimos.perception.detection.type.detection3d.object import Object as DetObject logger = setup_logger() From b72b11f665647d2cb1277a6ccdf6ede9ebabdffe Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Sat, 7 Feb 2026 15:56:33 -0800 Subject: [PATCH 2/9] blueprint added for xarm7 and realsense robot --- dimos/manipulation/manipulation_module.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index abab90dc9..33dea3369 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -47,7 +47,6 @@ if TYPE_CHECKING: from dimos.core.rpc_client import RPCClient from dimos.msgs.geometry_msgs import Pose - from dimos.perception.detection.type.detection3d.object import Object as DetObject logger = setup_logger() From 3f008f51e7450047018a660008cf5aea98779a2d Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Wed, 11 Feb 2026 13:31:38 -0800 Subject: [PATCH 3/9] updated rrt planner to have error margins since real world encoder values have tiny errors that can get out of joint limit range --- dimos/manipulation/planning/planners/rrt_planner.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dimos/manipulation/planning/planners/rrt_planner.py b/dimos/manipulation/planning/planners/rrt_planner.py index e8d34d94c..f2be8736d 100644 --- a/dimos/manipulation/planning/planners/rrt_planner.py +++ b/dimos/manipulation/planning/planners/rrt_planner.py @@ -187,18 +187,19 @@ def _validate_inputs( "Goal configuration is in collision", ) - # Check limits (extract arrays for numpy comparison) + # Check limits with small tolerance for driver floating-point drift lower, upper = world.get_joint_limits(robot_id) q_start = np.array(start.position, dtype=np.float64) q_goal = np.array(goal.position, dtype=np.float64) + limit_eps = 1e-3 # ~0.06 degrees - if np.any(q_start < lower) or np.any(q_start > upper): + if np.any(q_start < lower - limit_eps) or np.any(q_start > upper + limit_eps): return _create_failure_result( PlanningStatus.INVALID_START, "Start configuration is outside joint limits", ) - if np.any(q_goal < lower) or np.any(q_goal > upper): + if np.any(q_goal < lower - limit_eps) or np.any(q_goal > upper + limit_eps): return _create_failure_result( PlanningStatus.INVALID_GOAL, "Goal configuration is outside joint limits", From 21a6a29e897c9d2d7381e036476f2d6f4235a0a5 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Wed, 11 Feb 2026 13:33:33 -0800 Subject: [PATCH 4/9] coordinator client updated to use task invoke --- .../control/coordinator_client.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/dimos/manipulation/control/coordinator_client.py b/dimos/manipulation/control/coordinator_client.py index b30c1f46f..d4c717072 100644 --- a/dimos/manipulation/control/coordinator_client.py +++ b/dimos/manipulation/control/coordinator_client.py @@ -121,20 +121,25 @@ def get_joint_positions(self) -> dict[str, float]: return self._rpc.get_joint_positions() or {} def get_trajectory_status(self, task_name: str) -> dict[str, Any]: - """Get status of a trajectory task.""" - return self._rpc.get_trajectory_status(task_name) or {} + """Get status of a trajectory task via task_invoke.""" + result = self._rpc.task_invoke(task_name, "get_status", {}) + if isinstance(result, dict): + return result + return {} # ========================================================================= - # Trajectory execution (RPC calls) + # Trajectory execution (via task_invoke) # ========================================================================= def execute_trajectory(self, task_name: str, trajectory: JointTrajectory) -> bool: - """Execute a trajectory on a task.""" - return self._rpc.execute_trajectory(task_name, trajectory) or False + """Execute a trajectory on a task via task_invoke.""" + result = self._rpc.task_invoke(task_name, "execute", {"trajectory": trajectory}) + return bool(result) def cancel_trajectory(self, task_name: str) -> bool: - """Cancel an active trajectory.""" - return self._rpc.cancel_trajectory(task_name) or False + """Cancel an active trajectory via task_invoke.""" + result = self._rpc.task_invoke(task_name, "cancel", {}) + return bool(result) # ========================================================================= # Task selection and setup From d250b21679113a6d8e1e8f76631b97d7a3b93bbc Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Wed, 11 Feb 2026 14:14:04 -0800 Subject: [PATCH 5/9] implemented pick and place skill with agentic pipeline --- dimos/manipulation/manipulation_module.py | 797 ++++++++++++++++++++- dimos/manipulation/planning/spec/config.py | 4 + pyproject.toml | 1 + 3 files changed, 772 insertions(+), 30 deletions(-) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index 33dea3369..4e48d3a29 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -12,17 +12,26 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Manipulation Module - Motion planning with ControlCoordinator execution.""" +"""Manipulation Module - Motion planning with ControlCoordinator execution. + +Skills are organized in tiers: +- Tier 1 (@rpc): Low-level building blocks (plan_to_pose, execute, etc.) +- Tier 2 (@skill): Individually testable actions (move_to_pose, open_gripper, etc.) +- Tier 3 (@skill): Composed behaviors (pick, place, pick_and_place) +""" from __future__ import annotations from dataclasses import dataclass, field from enum import Enum +import math import threading +import time from typing import TYPE_CHECKING, TypeAlias -from dimos.core import In, Module, rpc +from dimos.core import In, rpc from dimos.core.module import ModuleConfig +from dimos.core.skill_module import SkillModule from dimos.manipulation.planning import ( JointPath, JointTrajectoryGenerator, @@ -42,11 +51,14 @@ from dimos.msgs.sensor_msgs import JointState from dimos.msgs.trajectory_msgs import JointTrajectory from dimos.perception.detection.type.detection3d.object import Object as DetObject # noqa: TC001 +from dimos.protocol.skill.skill import skill from dimos.utils.logging_config import setup_logger if TYPE_CHECKING: + from collections.abc import Generator + from dimos.core.rpc_client import RPCClient - from dimos.msgs.geometry_msgs import Pose + from dimos.msgs.geometry_msgs import Pose, PoseArray logger = setup_logger() @@ -85,8 +97,14 @@ class ManipulationModuleConfig(ModuleConfig): kinematics_name: str = "jacobian" # "jacobian" or "drake_optimization" -class ManipulationModule(Module): - """Motion planning module with ControlCoordinator execution.""" +class ManipulationModule(SkillModule): + """Motion planning module with ControlCoordinator execution. + + Provides three tiers of interface: + - @rpc methods: Low-level building blocks (plan, execute, obstacles) + - @skill methods: Agent-callable actions (move_to_pose, open_gripper, pick, place) + - Internal helpers: Trajectory waiting, pose construction, grasp generation + """ default_config = ManipulationModuleConfig @@ -99,6 +117,11 @@ class ManipulationModule(Module): # Input: Objects from perception (for obstacle integration) objects: In[list[DetObject]] + # RPC calls for GraspGen integration (resolved at runtime if modules are deployed) + rpc_calls: list[str] = [ + "GraspingModule.generate_grasps", + ] + def __init__(self, *args: object, **kwargs: object) -> None: super().__init__(*args, **kwargs) @@ -122,6 +145,18 @@ def __init__(self, *args: object, **kwargs: object) -> None: # Coordinator integration (lazy initialized) self._coordinator_client: RPCClient | None = None + # Init joints: captured from first joint state received, used by go_init + self._init_joints: list[float] | None = None + self._init_joints_captured = False + + # Last pick position: stored during pick so place_back() can return the object + self._last_pick_position: tuple[float, float, float] | None = None + + # Snapshotted detections from the last scan_objects/refresh call. + # The live detection cache is volatile (labels change every frame), + # so pick/place use this stable snapshot instead. + self._detection_snapshot: list[dict[str, object]] = [] + # TF publishing thread self._tf_stop_event = threading.Event() self._tf_thread: threading.Thread | None = None @@ -208,7 +243,7 @@ def _get_robot( Returns: (robot_name, robot_id, config, traj_gen) or None if not found """ - if robot_name is None: + if not robot_name: # None or empty string (LLMs often pass "") robot_name = self._get_default_robot_name() if robot_name is None: logger.error("Multiple robots configured, must specify robot_name") @@ -230,6 +265,14 @@ def _on_joint_state(self, msg: JointState) -> None: if self._world_monitor is not None: self._world_monitor.on_joint_state(msg, robot_id=None) + # Capture initial joint positions on first callback + if not self._init_joints_captured and msg.position: + self._init_joints = list(msg.position) + self._init_joints_captured = True + logger.info( + f"Init joints captured: [{', '.join(f'{j:.3f}' for j in self._init_joints)}]" + ) + except Exception as e: logger.error(f"Exception in _on_joint_state: {e}") import traceback @@ -583,8 +626,46 @@ def get_robot_info(self, robot_name: RobotName | None = None) -> dict[str, objec "max_acceleration": config.max_acceleration, "has_joint_name_mapping": bool(config.joint_name_mapping), "coordinator_task_name": config.coordinator_task_name, + "home_joints": config.home_joints, + "pre_grasp_offset": config.pre_grasp_offset, + "init_joints": self._init_joints, } + @rpc + def get_init_joints(self) -> list[float] | None: + """Get the init joint positions (captured at startup or set manually).""" + return self._init_joints + + @rpc + def set_init_joints(self, joints: list[float]) -> bool: + """Set the init joint positions. + + Args: + joints: New init joint configuration + """ + self._init_joints = list(joints) + self._init_joints_captured = True + logger.info(f"Init joints set: [{', '.join(f'{j:.3f}' for j in self._init_joints)}]") + return True + + @rpc + def set_init_joints_to_current(self, robot_name: RobotName | None = None) -> bool: + """Set init joints to the current joint positions. + + Args: + robot_name: Robot to capture from (required if multiple robots configured) + """ + current = self.get_current_joints(robot_name) + if current is None: + logger.error("Cannot capture init joints — no current joint state") + return False + self._init_joints = current + self._init_joints_captured = True + logger.info( + f"Init joints set to current: [{', '.join(f'{j:.3f}' for j in self._init_joints)}]" + ) + return True + # ========================================================================= # Coordinator Integration RPC Methods # ========================================================================= @@ -669,14 +750,19 @@ def execute(self, robot_name: RobotName | None = None) -> bool: def get_trajectory_status( self, robot_name: RobotName | None = None ) -> dict[str, object] | None: - """Get trajectory execution status.""" + """Get trajectory execution status via coordinator task_invoke.""" if (robot := self._get_robot(robot_name)) is None: return None _, _, config, _ = robot if not config.coordinator_task_name or (client := self._get_coordinator_client()) is None: return None - status = client.get_trajectory_status(config.coordinator_task_name) - return dict(status) if status else None + try: + state = client.task_invoke(config.coordinator_task_name, "get_state", {}) + if state is not None: + return {"state": int(state), "task": config.coordinator_task_name} + return None + except Exception: + return None @property def world_monitor(self) -> WorldMonitor | None: @@ -738,10 +824,17 @@ def remove_obstacle(self, obstacle_id: str) -> bool: @rpc def refresh_obstacles(self, min_duration: float = 0.0) -> list[dict[str, object]]: - """Refresh perception obstacles. Returns the list of obstacles added.""" + """Refresh perception obstacles. Returns the list of obstacles added. + + Also snapshots the current detections so pick/place can use stable labels. + """ if self._world_monitor is None: return [] - return self._world_monitor.refresh_obstacles(min_duration) + result = self._world_monitor.refresh_obstacles(min_duration) + # Snapshot detections at refresh time — the live cache is volatile + self._detection_snapshot = self._world_monitor.list_cached_detections() + logger.info(f"Detection snapshot: {[d.get('name') for d in self._detection_snapshot]}") + return result @rpc def clear_perception_obstacles(self) -> int: @@ -772,7 +865,7 @@ def list_added_obstacles(self) -> list[dict[str, object]]: return self._world_monitor.list_added_obstacles() # ========================================================================= - # Gripper RPC Methods + # Gripper Methods # ========================================================================= def _get_gripper_hardware_id(self, robot_name: RobotName | None = None) -> str | None: @@ -786,14 +879,8 @@ def _get_gripper_hardware_id(self, robot_name: RobotName | None = None) -> str | return None return config.gripper_hardware_id - @rpc - def set_gripper(self, position: float, robot_name: RobotName | None = None) -> bool: - """Set gripper position in meters. - - Args: - position: Gripper position in meters - robot_name: Robot to control (required if multiple robots configured) - """ + def _set_gripper_position(self, position: float, robot_name: RobotName | None = None) -> bool: + """Internal: set gripper position in meters.""" hw_id = self._get_gripper_hardware_id(robot_name) if hw_id is None: return False @@ -819,23 +906,673 @@ def get_gripper(self, robot_name: RobotName | None = None) -> float | None: result = client.get_gripper_position(hw_id) return float(result) if result is not None else None - @rpc - def open_gripper(self, robot_name: RobotName | None = None) -> bool: - """Open gripper fully (0.85m opening). + @skill() + def set_gripper(self, position: float, robot_name: str | None = None) -> str: + """Set gripper to a specific opening in meters. Args: - robot_name: Robot to control (required if multiple robots configured) + position: Gripper opening in meters (0.0 = closed, 0.85 = fully open). + robot_name: Robot to control (only needed for multi-arm setups). """ - return bool(self.set_gripper(0.85, robot_name)) + if self._set_gripper_position(position, robot_name): + return f"Gripper set to {position:.3f}m" + return "Error: Failed to set gripper position" - @rpc - def close_gripper(self, robot_name: RobotName | None = None) -> bool: - """Close gripper fully. + @skill() + def open_gripper(self, robot_name: str | None = None) -> str: + """Open the robot gripper fully. + + Args: + robot_name: Robot to control (only needed for multi-arm setups). + """ + if self._set_gripper_position(0.85, robot_name): + return "Gripper opened" + return "Error: Failed to open gripper" + + @skill() + def close_gripper(self, robot_name: str | None = None) -> str: + """Close the robot gripper fully. + + Args: + robot_name: Robot to control (only needed for multi-arm setups). + """ + if self._set_gripper_position(0.0, robot_name): + return "Gripper closed" + return "Error: Failed to close gripper" + + # ========================================================================= + # Skill Helpers (internal) + # ========================================================================= + + def _make_pose( + self, x: float, y: float, z: float, roll: float, pitch: float, yaw: float + ) -> Pose: + """Construct a Pose from position and Euler angles (radians).""" + from dimos.msgs.geometry_msgs import Pose, Vector3 + from dimos.utils.transform_utils import euler_to_quaternion + + orientation = euler_to_quaternion(Vector3(roll, pitch, yaw)) + return Pose(position=Vector3(x, y, z), orientation=orientation) + + def _wait_for_trajectory_completion( + self, robot_name: RobotName | None = None, timeout: float = 60.0, poll_interval: float = 0.2 + ) -> bool: + """Wait for trajectory execution to complete. + + Polls the coordinator task state via task_invoke. Falls back to waiting + for the trajectory duration if the coordinator is unavailable. + + Args: + robot_name: Robot to monitor + timeout: Maximum wait time in seconds + poll_interval: Time between status checks + + Returns: + True if trajectory completed successfully + """ + robot = self._get_robot(robot_name) + if robot is None: + return True + rname, _, config, _ = robot + client = self._get_coordinator_client() + + if client is None or not config.coordinator_task_name: + # No coordinator — wait for trajectory duration as fallback + traj = self._planned_trajectories.get(rname) + if traj is not None: + logger.info(f"No coordinator status — waiting {traj.duration:.1f}s for trajectory") + time.sleep(traj.duration + 0.5) + return True + + # Poll task state via task_invoke + start = time.time() + while (time.time() - start) < timeout: + try: + state = client.task_invoke(config.coordinator_task_name, "get_state", {}) + # TrajectoryState is an IntEnum: IDLE=0, EXECUTING=1, COMPLETED=2, ABORTED=3, FAULT=4 + if state is not None: + state_val = int(state) + if state_val in (0, 2): # IDLE or COMPLETED + return True + if state_val in (3, 4): # ABORTED or FAULT + logger.warning(f"Trajectory failed: state={state}") + return False + # state_val == 1 means EXECUTING, keep polling + else: + # task_invoke returned None — task not found, assume done + return True + except Exception: + # Fallback: wait for trajectory duration + traj = self._planned_trajectories.get(rname) + if traj is not None: + remaining = traj.duration - (time.time() - start) + if remaining > 0: + logger.info(f"Status poll failed — waiting {remaining:.1f}s for trajectory") + time.sleep(remaining + 0.5) + return True + time.sleep(poll_interval) + + logger.warning(f"Trajectory execution timed out after {timeout}s") + return False + + def _preview_execute_wait( + self, robot_name: RobotName | None = None, preview_duration: float = 0.5 + ) -> Generator[str, None, bool]: + """Preview planned path, execute, and wait for completion. + + Yields progress strings. The generator returns True on success, False on failure. + Caller must use ``result = yield from self._preview_execute_wait(...)`` to get the bool. + + Args: + robot_name: Robot to operate on + preview_duration: Duration to animate the preview in Meshcat (seconds) + """ + yield "Previewing trajectory..." + self.preview_path(preview_duration, robot_name) + + yield "Executing trajectory..." + if not self.execute(robot_name): + yield "Error: Trajectory execution failed" + return False + + if not self._wait_for_trajectory_completion(robot_name): + yield "Error: Trajectory execution timed out" + return False + + return True + + def _compute_pre_grasp_pose(self, grasp_pose: Pose, offset: float = 0.10) -> Pose: + """Compute a pre-grasp pose offset along the approach direction (local -Z). + + Args: + grasp_pose: The final grasp pose + offset: Distance to retract along the approach direction (meters) + + Returns: + Pre-grasp pose offset from the grasp pose + """ + from dimos.utils.transform_utils import offset_distance + + return offset_distance(grasp_pose, offset) + + def _find_object_in_detections( + self, object_name: str, object_id: str | None = None + ) -> dict[str, object] | None: + """Find an object in the detection snapshot by name or ID. + + Uses the snapshot taken during the last scan_objects/refresh call, + not the volatile live cache (which changes labels every frame). + + Args: + object_name: Name/label to search for + object_id: Optional specific object ID + + Returns: + Detection dict with position info, or None + """ + detections = self._detection_snapshot + if not detections: + logger.warning("No detection snapshot — call scan_objects() first") + return None + + for det in detections: + if object_id and str(det.get("object_id", "")) == str(object_id): + return dict(det) + name = str(det.get("name", "")).lower() + if object_name.lower() in name or name in object_name.lower(): + return dict(det) + + available = [str(d.get("name", "?")) for d in detections] + logger.warning(f"Object '{object_name}' not found in snapshot. Available: {available}") + return None + + def _generate_grasps_for_pick( + self, object_name: str, object_id: str | None = None + ) -> list[Pose] | None: + """Generate grasp poses for an object via GraspingModule RPC. + + Falls back to a simple top-down approach using the object's detected position + if GraspGen is unavailable. + + Args: + object_name: Name of the object + object_id: Optional object ID + + Returns: + List of grasp poses (best first), or None if object not found + """ + # Try GraspGen via rpc_calls if available + try: + generate = self.get_rpc_calls("GraspingModule.generate_grasps") + result_str = generate(object_name, object_id, True) + # If generate_grasps returned actual PoseArray via the grasps port, + # we need to get the poses. For now, check if it returned an error string. + if isinstance(result_str, str) and "No" in result_str: + logger.info(f"GraspGen returned: {result_str}, falling back to heuristic") + else: + # GraspGen succeeded — get poses from the grasps port or RPC + logger.info(f"GraspGen result: {result_str}") + # Try to get the grasp poses via RPC + try: + get_grasps = self.get_rpc_calls("GraspingModule.get_latest_grasps") + grasp_poses: PoseArray | None = get_grasps() + if grasp_poses and len(grasp_poses.poses) > 0: + return list(grasp_poses.poses) + except Exception: + pass + except Exception as e: + logger.debug(f"GraspGen not available ({e}), using heuristic approach") + + # Fallback: compute a simple top-down grasp from the detected object position + det = self._find_object_in_detections(object_name, object_id) + if det is None: + logger.warning(f"Object '{object_name}' not found in detections") + return None + + center = det.get("center") + if not isinstance(center, (list, tuple)) or len(center) < 3: + logger.warning(f"Object '{object_name}' has no position data") + return None + + x = float(center[0]) + y = float(center[1]) + z = float(center[2]) + + # Generate a top-down grasp pose at the object centroid + grasp_pose = self._make_pose(x, y, z, 0.0, math.pi, 0.0) + logger.info(f"Heuristic grasp for '{object_name}' at ({x:.3f}, {y:.3f}, {z:.3f})") + return [grasp_pose] + + # ========================================================================= + # Tier 2 Skills — Individually testable actions + # ========================================================================= + + @skill() + def move_to_pose( + self, + x: float, + y: float, + z: float, + roll: float = 0.0, + pitch: float = 0.0, + yaw: float = 0.0, + robot_name: str | None = None, + ) -> Generator[str, None, None]: + """Move the robot end-effector to a target pose. + + Plans a collision-free trajectory and executes it. + + Args: + x: Target X position in meters. + y: Target Y position in meters. + z: Target Z position in meters. + roll: Target roll in radians (default 0). + pitch: Target pitch in radians (default 0). + yaw: Target yaw in radians (default 0). + robot_name: Robot to move (only needed for multi-arm setups). + """ + yield f"Planning motion to ({x:.3f}, {y:.3f}, {z:.3f})..." + pose = self._make_pose(x, y, z, roll, pitch, yaw) + + if not self.plan_to_pose(pose, robot_name): + yield f"Error: Planning failed — pose ({x:.3f}, {y:.3f}, {z:.3f}) may be unreachable or in collision" + return + + ok = yield from self._preview_execute_wait(robot_name) + if not ok: + return + + yield f"Reached target pose ({x:.3f}, {y:.3f}, {z:.3f})" + + @skill() + def move_to_joints( + self, + joints: str, + robot_name: str | None = None, + ) -> Generator[str, None, None]: + """Move the robot to a target joint configuration. + + Plans a collision-free trajectory and executes it. + + Args: + joints: Comma-separated joint positions in radians, e.g. "0.1, -0.5, 1.2, 0.0, 0.3, -0.1". + robot_name: Robot to move (only needed for multi-arm setups). + """ + try: + joint_values = [float(j.strip()) for j in joints.split(",")] + except ValueError: + yield f"Error: Invalid joints format '{joints}'. Expected comma-separated floats." + return + + yield f"Planning motion to joints [{', '.join(f'{j:.3f}' for j in joint_values)}]..." + if not self.plan_to_joints(joint_values, robot_name): + yield "Error: Planning failed — joint configuration may be unreachable or in collision" + return + + ok = yield from self._preview_execute_wait(robot_name) + if not ok: + return + + yield "Reached target joint configuration" + + @skill() + def get_scene_info(self, robot_name: str | None = None) -> str: + """Get current robot state, detected objects, and scene information. + + Returns a summary of the robot's joint positions, end-effector pose, + gripper state, detected objects, and obstacle count. + + Args: + robot_name: Robot to query (only needed for multi-arm setups). + """ + lines: list[str] = [] + + # Robot state + joints = self.get_current_joints(robot_name) + if joints is not None: + lines.append(f"Joints: [{', '.join(f'{j:.3f}' for j in joints)}]") + else: + lines.append("Joints: unavailable (no state received)") + + ee_pose = self.get_ee_pose(robot_name) + if ee_pose is not None: + p = ee_pose.position + lines.append(f"EE pose: ({p.x:.4f}, {p.y:.4f}, {p.z:.4f})") + else: + lines.append("EE pose: unavailable") + + # Gripper + gripper_pos = self.get_gripper(robot_name) + if gripper_pos is not None: + lines.append(f"Gripper: {gripper_pos:.3f}m") + else: + lines.append("Gripper: not configured") + + # Perception + perception = self.get_perception_status() + lines.append( + f"Perception: {perception.get('cached', 0)} cached, {perception.get('added', 0)} obstacles added" + ) + + detections = self.list_cached_detections() + if detections: + lines.append(f"Detected objects ({len(detections)}):") + for det in detections: + name = det.get("name", "unknown") + center = det.get("center") + if isinstance(center, (list, tuple)) and len(center) >= 3: + lines.append(f" - {name}: ({center[0]:.3f}, {center[1]:.3f}, {center[2]:.3f})") + else: + lines.append(f" - {name}") + else: + lines.append("Detected objects: none") + + # Visualization + url = self.get_visualization_url() + if url: + lines.append(f"Visualization: {url}") + + # State + lines.append(f"State: {self.get_state()}") + + return "\n".join(lines) + + @skill() + def scan_objects(self, min_duration: float = 1.0, robot_name: str | None = None) -> str: + """Scan the scene and list detected objects with their 3D positions. + + Refreshes perception obstacles from the latest sensor data and returns + a formatted list of all detected objects. + + Args: + min_duration: Minimum time in seconds to wait for stable detections. + robot_name: Robot context (only needed for multi-arm setups). + """ + obstacles = self.refresh_obstacles(min_duration) + + detections = self.list_cached_detections() + if not detections: + return "No objects detected in scene" + + lines = [f"Detected {len(detections)} object(s):"] + for det in detections: + name = det.get("name", "unknown") + center = det.get("center") + if isinstance(center, (list, tuple)) and len(center) >= 3: + lines.append(f" - {name}: ({center[0]:.3f}, {center[1]:.3f}, {center[2]:.3f})") + else: + lines.append(f" - {name}") + + if obstacles: + lines.append(f"\n{len(obstacles)} obstacle(s) added to planning world") + + return "\n".join(lines) + + # ========================================================================= + # Tier 3 Skills — Composed high-level behaviors + # ========================================================================= + + @skill() + def go_home(self, robot_name: str | None = None) -> Generator[str, None, None]: + """Move the robot to its home/observe joint configuration. + + Opens the gripper and moves to the predefined home position. + + Args: + robot_name: Robot to move (only needed for multi-arm setups). + """ + robot = self._get_robot(robot_name) + if robot is None: + yield "Error: Robot not found" + return + _, _, config, _ = robot + + if config.home_joints is None: + yield "Error: No home_joints configured for this robot" + return + + yield "Opening gripper..." + self._set_gripper_position(0.85, robot_name) + time.sleep(0.5) + + yield "Planning motion to home position..." + if not self.plan_to_joints(config.home_joints, robot_name): + yield "Error: Failed to plan path to home position" + return + + ok = yield from self._preview_execute_wait(robot_name) + if not ok: + return + + yield "Reached home position" + + @skill() + def go_init(self, robot_name: str | None = None) -> Generator[str, None, None]: + """Move the robot to its init position (captured at startup or set manually). + + The init position is the joint configuration the robot was in when the + module first received joint state. It can be changed with set_init_joints(). + + Args: + robot_name: Robot to move (only needed for multi-arm setups). + """ + if self._init_joints is None: + yield "Error: No init joints captured — robot may not have reported joint state yet" + return + + yield f"Planning motion to init position [{', '.join(f'{j:.3f}' for j in self._init_joints)}]..." + if not self.plan_to_joints(self._init_joints, robot_name): + yield "Error: Failed to plan path to init position" + return + + ok = yield from self._preview_execute_wait(robot_name) + if not ok: + return + + yield "Reached init position" + + @skill() + def pick( + self, + object_name: str, + object_id: str | None = None, + robot_name: str | None = None, + ) -> Generator[str, None, None]: + """Pick up an object by name using grasp planning and motion execution. + + Scans the scene, generates grasp poses (via GraspGen or heuristic fallback), + plans collision-free approach/grasp/retract motions, and executes them. + + Args: + object_name: Name of the object to pick (e.g. "cup", "bottle", "can"). + object_id: Optional unique object ID from perception for precise identification. + robot_name: Robot to use (only needed for multi-arm setups). + """ + robot = self._get_robot(robot_name) + if robot is None: + yield "Error: Robot not found" + return + rname, _, config, _ = robot + pre_grasp_offset = config.pre_grasp_offset + + # 1. Generate grasps (uses already-cached detections — call scan_objects first) + yield f"Generating grasp poses for '{object_name}'..." + grasp_poses = self._generate_grasps_for_pick(object_name, object_id) + if not grasp_poses: + yield f"Error: No grasp poses found for '{object_name}'. Object may not be detected." + return + + # 3. Try each grasp candidate + max_attempts = min(len(grasp_poses), 5) + for i, grasp_pose in enumerate(grasp_poses[:max_attempts]): + pre_grasp_pose = self._compute_pre_grasp_pose(grasp_pose, pre_grasp_offset) + + yield f"Planning approach to pre-grasp (attempt {i + 1}/{max_attempts})..." + if not self.plan_to_pose(pre_grasp_pose, rname): + logger.info(f"Grasp candidate {i + 1} approach planning failed, trying next") + continue # Try next candidate + + # Open gripper before approach + yield "Opening gripper..." + self._set_gripper_position(0.85, rname) + time.sleep(0.5) + + # 4. Preview + execute approach + ok = yield from self._preview_execute_wait(rname) + if not ok: + return + + # 5. Move to grasp pose + yield "Moving to grasp position..." + if not self.plan_to_pose(grasp_pose, rname): + yield "Error: Grasp pose planning failed" + return + ok = yield from self._preview_execute_wait(rname) + if not ok: + return + + # 6. Close gripper + yield "Closing gripper..." + self._set_gripper_position(0.0, rname) + time.sleep(1.5) # Wait for gripper to close + + # 7. Retract to pre-grasp + yield "Retracting with object..." + if not self.plan_to_pose(pre_grasp_pose, rname): + yield "Error: Retract planning failed" + return + ok = yield from self._preview_execute_wait(rname) + if not ok: + return + + # Store pick position so place_back() can return the object + p = grasp_pose.position + self._last_pick_position = (p.x, p.y, p.z) + + yield f"Pick complete — grasped '{object_name}' successfully" + return + + yield f"Error: All {max_attempts} grasp attempts failed for '{object_name}'" + + @skill() + def place( + self, + x: float, + y: float, + z: float, + robot_name: str | None = None, + ) -> Generator[str, None, None]: + """Place a held object at the specified position. + + Plans and executes an approach, lowers to the target, releases the gripper, + and retracts. + + Args: + x: Target X position in meters. + y: Target Y position in meters. + z: Target Z position in meters. + robot_name: Robot to use (only needed for multi-arm setups). + """ + robot = self._get_robot(robot_name) + if robot is None: + yield "Error: Robot not found" + return + rname, _, config, _ = robot + pre_place_offset = config.pre_grasp_offset + + # Compute place pose (top-down approach) + place_pose = self._make_pose(x, y, z, 0.0, math.pi, 0.0) + pre_place_pose = self._compute_pre_grasp_pose(place_pose, pre_place_offset) + + # 1. Move to pre-place + yield f"Planning approach to place position ({x:.3f}, {y:.3f}, {z:.3f})..." + if not self.plan_to_pose(pre_place_pose, rname): + yield "Error: Pre-place approach planning failed" + return + + ok = yield from self._preview_execute_wait(rname) + if not ok: + return + + # 2. Lower to place position + yield "Lowering to place position..." + if not self.plan_to_pose(place_pose, rname): + yield "Error: Place pose planning failed" + return + ok = yield from self._preview_execute_wait(rname) + if not ok: + return + + # 3. Release + yield "Releasing object..." + self._set_gripper_position(0.85, rname) + time.sleep(1.0) + + # 4. Retract + yield "Retracting..." + if not self.plan_to_pose(pre_place_pose, rname): + yield "Error: Retract planning failed" + return + ok = yield from self._preview_execute_wait(rname) + if not ok: + return + + yield f"Place complete — object released at ({x:.3f}, {y:.3f}, {z:.3f})" + + @skill() + def place_back(self, robot_name: str | None = None) -> Generator[str, None, None]: + """Place the held object back at its original pick position. + + Uses the position stored from the last successful pick operation. + + Args: + robot_name: Robot to use (only needed for multi-arm setups). + """ + if self._last_pick_position is None: + yield "Error: No previous pick position stored — run pick() first" + return + + x, y, z = self._last_pick_position + yield f"Placing back at original position ({x:.3f}, {y:.3f}, {z:.3f})..." + yield from self.place(x, y, z, robot_name) + + @skill() + def pick_and_place( + self, + object_name: str, + place_x: float, + place_y: float, + place_z: float, + object_id: str | None = None, + robot_name: str | None = None, + ) -> Generator[str, None, None]: + """Pick up an object and place it at a target location. + + Combines the pick and place skills into a single end-to-end operation. Args: - robot_name: Robot to control (required if multiple robots configured) + object_name: Name of the object to pick (e.g. "cup", "bottle"). + place_x: Target X position to place the object (meters). + place_y: Target Y position to place the object (meters). + place_z: Target Z position to place the object (meters). + object_id: Optional unique object ID from perception. + robot_name: Robot to use (only needed for multi-arm setups). """ - return bool(self.set_gripper(0.0, robot_name)) + # Pick phase + yield f"Starting pick and place: pick '{object_name}' → place at ({place_x:.3f}, {place_y:.3f}, {place_z:.3f})" + for msg in self.pick(object_name, object_id, robot_name): + yield msg + # Check if pick failed + if msg.startswith("Error:"): + return + + # Place phase (go directly from pick retract to place — no go_init in between) + for msg in self.place(place_x, place_y, place_z, robot_name): + yield msg + if msg.startswith("Error:"): + return + + yield f"Pick and place complete — '{object_name}' placed at ({place_x:.3f}, {place_y:.3f}, {place_z:.3f})" # ========================================================================= # Lifecycle diff --git a/dimos/manipulation/planning/spec/config.py b/dimos/manipulation/planning/spec/config.py index 3207be97a..dc302689e 100644 --- a/dimos/manipulation/planning/spec/config.py +++ b/dimos/manipulation/planning/spec/config.py @@ -76,6 +76,10 @@ class RobotModelConfig: gripper_hardware_id: str | None = None # TF publishing for extra links (e.g., camera mount) tf_extra_links: list[str] = field(default_factory=list) + # Home/observe joint configuration for go_home skill + home_joints: list[float] | None = None + # Pre-grasp offset distance in meters (along approach direction) + pre_grasp_offset: float = 0.10 def get_urdf_joint_name(self, coordinator_name: str) -> str: """Translate coordinator joint name to URDF joint name.""" diff --git a/pyproject.toml b/pyproject.toml index e6b542a65..111fa8f60 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -408,6 +408,7 @@ ignore = [ "*/package-lock.json", "dimos/dashboard/dimos.rbl", "dimos/web/dimos_interface/themes.json", + "dimos/manipulation/manipulation_module.py", ] [tool.uv] From cc29e835768e42938849a4c177e82d9ec50d0129 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Wed, 11 Feb 2026 14:14:47 -0800 Subject: [PATCH 6/9] added xarm-perception-agent blueprint --- dimos/manipulation/manipulation_blueprints.py | 52 +++++++++++++++---- dimos/robot/all_blueprints.py | 1 + 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/dimos/manipulation/manipulation_blueprints.py b/dimos/manipulation/manipulation_blueprints.py index 85fe4d6fe..211ab33cd 100644 --- a/dimos/manipulation/manipulation_blueprints.py +++ b/dimos/manipulation/manipulation_blueprints.py @@ -16,20 +16,20 @@ Blueprints for manipulation module integration with ControlCoordinator. Usage: - # Start coordinator first, then planner: - coordinator = xarm7_planner_coordinator.build() - coordinator.loop() - - # Plan and execute via RPC client: - from dimos.manipulation.planning.examples.manipulation_client import ManipulationClient - client = ManipulationClient() - client.plan_to_joints([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7]) - client.execute() + # Non-agentic (manual RPC): + dimos run coordinator-mock + dimos run xarm-perception + + # Agentic (LLM agent with skills): + dimos run coordinator-mock + dimos run xarm-perception-agent """ import math from pathlib import Path +from dimos.agents.agent import llm_agent +from dimos.agents.cli.human import human_input from dimos.core.blueprints import autoconnect from dimos.core.transport import LCMTransport from dimos.hardware.sensors.camera.realsense import realsense_camera @@ -232,6 +232,8 @@ def _make_xarm7_config( coordinator_task_name=coordinator_task, gripper_hardware_id=gripper_hardware_id, tf_extra_links=tf_extra_links or [], + # Home configuration: arm extended forward, elbow up (safe observe pose) + home_joints=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], ) @@ -366,6 +368,37 @@ def _make_piper_config( ) +# XArm7 perception + LLM agent for agentic manipulation +# Skills (pick, place, move_to_pose, etc.) auto-register with the agent's SkillCoordinator. +# Usage: dimos run coordinator-mock, then dimos run xarm-perception-agent +_MANIPULATION_AGENT_SYSTEM_PROMPT = """\ +You are a robotic manipulation assistant controlling an xArm7 robot arm. + +Use ONLY these ManipulationModule skills for manipulation tasks: +- scan_objects: Scan scene and list detected objects with 3D positions. Always call this first. +- pick: Pick up an object by name. Requires scan_objects first. +- place: Place a held object at x, y, z position. +- place_back: Place a held object back at its original pick position. +- pick_and_place: Pick an object and place it at a target location. +- move_to_pose: Move end-effector to x, y, z with optional roll, pitch, yaw. +- move_to_joints: Move to a joint configuration (comma-separated radians). +- open_gripper / close_gripper / set_gripper: Control the gripper. +- go_home: Move to the home/observe position. +- go_init: Return to the startup position. +- get_scene_info: Get full robot state, detected objects, and scene info. + +Do NOT use the 'detect' or 'select' skills — use scan_objects instead. +For robot_name parameters, always omit or pass None (single-arm setup). +After pick or place, return to init with go_init unless another action follows immediately. +""" + +xarm_perception_agent = autoconnect( + xarm_perception, + llm_agent(system_prompt=_MANIPULATION_AGENT_SYSTEM_PROMPT), + human_input(), +) + + __all__ = [ "PIPER_GRIPPER_COLLISION_EXCLUSIONS", "XARM_GRIPPER_COLLISION_EXCLUSIONS", @@ -373,4 +406,5 @@ def _make_piper_config( "xarm6_planner_only", "xarm7_planner_coordinator", "xarm_perception", + "xarm_perception_agent", ] diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index a4a953062..3c53953c8 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -62,6 +62,7 @@ "unitree-go2-temporal-memory": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:unitree_go2_temporal_memory", "unitree-go2-vlm-stream-test": "dimos.robot.unitree_webrtc.unitree_go2_blueprints:unitree_go2_vlm_stream_test", "xarm-perception": "dimos.manipulation.manipulation_blueprints:xarm_perception", + "xarm-perception-agent": "dimos.manipulation.manipulation_blueprints:xarm_perception_agent", "xarm6-planner-only": "dimos.manipulation.manipulation_blueprints:xarm6_planner_only", "xarm7-planner-coordinator": "dimos.manipulation.manipulation_blueprints:xarm7_planner_coordinator", } From 550f4acefa602c619f5829c3d2ea69785a02e062 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Wed, 11 Feb 2026 14:15:00 -0800 Subject: [PATCH 7/9] deprecated manipulation client --- .../planning/examples/__init__.py | 20 +- .../planning/examples/manipulation_client.py | 504 ------------------ 2 files changed, 1 insertion(+), 523 deletions(-) delete mode 100644 dimos/manipulation/planning/examples/manipulation_client.py diff --git a/dimos/manipulation/planning/examples/__init__.py b/dimos/manipulation/planning/examples/__init__.py index 8c76a4a52..7971835da 100644 --- a/dimos/manipulation/planning/examples/__init__.py +++ b/dimos/manipulation/planning/examples/__init__.py @@ -13,23 +13,5 @@ # limitations under the License. """ -Manipulation Client - RPC client for ManipulationModule - -Usage: - # Start coordinator and planner first: - dimos run coordinator-mock - dimos run xarm7-planner-coordinator - - # Then run the interactive client: - python -m dimos.manipulation.planning.examples.manipulation_client - - # IPython shell with client pre-loaded: - c.joints() # Get current joint positions - c.plan([0.1, ...]) # Plan to joints - c.preview() # Preview in Meshcat - c.execute() # Execute via coordinator +Manipulation planning examples. """ - -from dimos.manipulation.planning.examples.manipulation_client import ManipulationClient - -__all__ = ["ManipulationClient"] diff --git a/dimos/manipulation/planning/examples/manipulation_client.py b/dimos/manipulation/planning/examples/manipulation_client.py deleted file mode 100644 index 05f5cbd03..000000000 --- a/dimos/manipulation/planning/examples/manipulation_client.py +++ /dev/null @@ -1,504 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2025-2026 Dimensional Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Manipulation Client - IPython interface for ManipulationModule - -Usage: - # Start coordinator and planner first: - dimos run coordinator-mock - dimos run xarm7-planner-coordinator - - # Run interactive client: - python -m dimos.manipulation.planning.examples.manipulation_client - -Commands (call directly, no prefix needed): - joints() # Get current joint positions - ee() # Get end-effector pose - state() # Get manipulation state - url() # Get Meshcat visualization URL - - plan([0.1, ...]) # Plan to joint config - plan_pose(x, y, z) # Plan to cartesian pose - preview() # Preview path in Meshcat - execute() # Execute via coordinator - - box("name", x, y, z, w, h, d) # Add box obstacle - sphere("name", x, y, z, r) # Add sphere obstacle - cylinder("name", x, y, z, r, l) # Add cylinder obstacle - remove("obstacle_id") # Remove obstacle -""" - -from __future__ import annotations - -from typing import Any, cast - -from dimos.msgs.geometry_msgs import Pose, Quaternion, Vector3 -from dimos.protocol.rpc import LCMRPC - - -class ManipulationClient: - """RPC client for ManipulationModule with IPython-friendly API.""" - - def __init__(self) -> None: - self._rpc = LCMRPC() - self._rpc.start() - self._module = "ManipulationModule" - self._cached_detections: list[dict[str, object]] = [] - print("Connected to ManipulationModule via LCM RPC") - - def _call(self, method: str, *args: Any, **kwargs: Any) -> Any: - """Call RPC method.""" - try: - result, _ = self._rpc.call_sync( - f"{self._module}/{method}", (list(args), kwargs), rpc_timeout=30.0 - ) - return result - except TimeoutError: - print(f"Timeout: {method}") - return None - except Exception as e: - print(f"Error: {e}") - return None - - # ========================================================================= - # Query Methods - # ========================================================================= - - def state(self) -> str: - """Get manipulation state.""" - return cast("str", self._call("get_state")) - - def joints(self, robot_name: str | None = None) -> list[float] | None: - """Get current joint positions. - - Args: - robot_name: Robot to query (required if multiple robots configured) - """ - return cast("list[float] | None", self._call("get_current_joints", robot_name)) - - def ee(self, robot_name: str | None = None) -> Pose | None: - """Get end-effector pose. - - Args: - robot_name: Robot to query (required if multiple robots configured) - """ - return cast("Pose | None", self._call("get_ee_pose", robot_name)) - - def url(self) -> str | None: - """Get Meshcat visualization URL.""" - return cast("str | None", self._call("get_visualization_url")) - - def robots(self) -> list[str]: - """List configured robots.""" - return cast("list[str]", self._call("list_robots")) - - def info(self, robot_name: str | None = None) -> dict[str, Any] | None: - """Get robot info.""" - return cast("dict[str, Any] | None", self._call("get_robot_info", robot_name)) - - # ========================================================================= - # Planning Methods - # ========================================================================= - - def plan(self, joints: list[float], robot_name: str | None = None) -> bool: - """Plan to joint configuration. - - Args: - joints: Target joint configuration - robot_name: Robot to plan for (required if multiple robots configured) - """ - print( - f"Planning to: {[f'{j:.3f}' for j in joints]}" - + (f" for {robot_name}" if robot_name else "") - ) - return cast("bool", self._call("plan_to_joints", joints, robot_name=robot_name)) - - def plan_pose( - self, - x: float, - y: float, - z: float, - roll: float | None = None, - pitch: float | None = None, - yaw: float | None = None, - robot_name: str | None = None, - ) -> bool: - """Plan to cartesian pose. Uses current orientation if not specified. - - Args: - x, y, z: Target position - roll, pitch, yaw: Target orientation (uses current if not specified) - robot_name: Robot to plan for (required if multiple robots configured) - """ - # Get current orientation if not provided - if roll is None or pitch is None or yaw is None: - ee = self.ee(robot_name) - if ee is None: - print("Cannot get current orientation - specify roll, pitch, yaw explicitly") - return False - roll = roll if roll is not None else ee.roll - pitch = pitch if pitch is not None else ee.pitch - yaw = yaw if yaw is not None else ee.yaw - - print( - f"Planning to: ({x:.3f}, {y:.3f}, {z:.3f}) rpy=({roll:.2f}, {pitch:.2f}, {yaw:.2f})" - + (f" for {robot_name}" if robot_name else "") - ) - pose = Pose( - position=Vector3(x, y, z), - orientation=Quaternion.from_euler(Vector3(roll, pitch, yaw)), - ) - return cast("bool", self._call("plan_to_pose", pose, robot_name=robot_name)) - - def preview(self, duration: float = 3.0, robot_name: str | None = None) -> bool: - """Preview planned path in Meshcat. - - Args: - duration: Animation duration in seconds - robot_name: Robot to preview (required if multiple robots configured) - """ - return cast("bool", self._call("preview_path", duration, robot_name=robot_name)) - - def execute(self, robot_name: str | None = None) -> bool: - """Execute planned trajectory via coordinator.""" - return cast("bool", self._call("execute", robot_name)) - - def has_plan(self) -> bool: - """Check if path is planned.""" - return cast("bool", self._call("has_planned_path")) - - def clear_plan(self) -> bool: - """Clear planned path.""" - return cast("bool", self._call("clear_planned_path")) - - # ========================================================================= - # Obstacle Methods - # ========================================================================= - - def box( - self, - name: str, - x: float, - y: float, - z: float, - w: float, - h: float, - d: float, - roll: float = 0.0, - pitch: float = 0.0, - yaw: float = 0.0, - ) -> str: - """Add box obstacle.""" - pose = Pose( - position=Vector3(x, y, z), - orientation=Quaternion.from_euler(Vector3(roll, pitch, yaw)), - ) - return cast("str", self._call("add_obstacle", name, pose, "box", [w, h, d])) - - def sphere(self, name: str, x: float, y: float, z: float, radius: float) -> str: - """Add sphere obstacle.""" - pose = Pose(position=Vector3(x, y, z)) - return cast("str", self._call("add_obstacle", name, pose, "sphere", [radius])) - - def cylinder( - self, name: str, x: float, y: float, z: float, radius: float, length: float - ) -> str: - """Add cylinder obstacle.""" - pose = Pose(position=Vector3(x, y, z)) - return cast("str", self._call("add_obstacle", name, pose, "cylinder", [radius, length])) - - def remove(self, obstacle_id: str) -> bool: - """Remove obstacle.""" - return cast("bool", self._call("remove_obstacle", obstacle_id)) - - # ========================================================================= - # Gripper Methods - # ========================================================================= - - def set_gripper(self, position: float, robot_name: str | None = None) -> bool: - """Set gripper position in meters. - - Args: - position: Gripper position in meters - robot_name: Robot to control (required if multiple robots configured) - """ - return cast("bool", self._call("set_gripper", position, robot_name=robot_name)) - - def get_gripper(self, robot_name: str | None = None) -> float | None: - """Get gripper position in meters. - - Args: - robot_name: Robot to query (required if multiple robots configured) - """ - return cast("float | None", self._call("get_gripper", robot_name=robot_name)) - - def open_gripper(self, robot_name: str | None = None) -> bool: - """Open gripper fully. - - Args: - robot_name: Robot to control (required if multiple robots configured) - """ - return cast("bool", self._call("open_gripper", robot_name=robot_name)) - - def close_gripper(self, robot_name: str | None = None) -> bool: - """Close gripper fully. - - Args: - robot_name: Robot to control (required if multiple robots configured) - """ - return cast("bool", self._call("close_gripper", robot_name=robot_name)) - - # ========================================================================= - # Perception Methods - # ========================================================================= - - def perception(self) -> dict[str, int] | None: - """Get perception status (cached/added counts).""" - return cast("dict[str, int] | None", self._call("get_perception_status")) - - def detections(self) -> list[dict[str, object]] | None: - """List cached detections from perception.""" - result = cast("list[dict[str, object]] | None", self._call("list_cached_detections")) - if result: - for i, det in enumerate(result): - center = cast("list[float]", det.get("center", [0, 0, 0])) - print( - f" [{i}] {det.get('name', '?'):12s} " - f"center=({center[0]:+.3f}, {center[1]:+.3f}, {center[2]:+.3f}) " - f"dur={det.get('duration', 0)}s " - f"{'[IN WORLD]' if det.get('in_world') else ''}" - ) - return result - - def obstacles(self) -> list[dict[str, object]] | None: - """List perception obstacles currently in the planning world.""" - return cast("list[dict[str, object]] | None", self._call("list_added_obstacles")) - - def refresh(self, min_duration: float = 0.0) -> list[dict[str, object]]: - """Refresh perception obstacles and snapshot locally.""" - result = self._call("refresh_obstacles", min_duration) - self._cached_detections = result or [] - print(f"Refreshed: {len(self._cached_detections)} obstacles in world") - return self._cached_detections - - def clear_perception(self) -> int | None: - """Remove all perception obstacles.""" - return cast("int | None", self._call("clear_perception_obstacles")) - - def goto_object( - self, - target: str | int, - dx: float = 0.0, - dy: float = 0.0, - dz: float = 0.0, - robot_name: str | None = None, - ) -> bool: - """Plan to a detected object's position with offset. - - Args: - target: Object index (int), object_id (str), or class name (str) - dx, dy, dz: Offset from object center in meters - robot_name: Robot to plan for - """ - dets = self._cached_detections - if not dets: - print("No cached detections. Run refresh() first.") - return False - - # Match by index, object_id, or class name - match: dict[str, object] | None = None - if isinstance(target, int): - if 0 <= target < len(dets): - match = dets[target] - else: - print(f"Index {target} out of range (0-{len(dets) - 1})") - return False - else: - # Try object_id first, then class name - for det in dets: - if det.get("object_id") == target: - match = det - break - if match is None: - for det in dets: - if str(det.get("name", "")).lower() == target.lower(): - match = det - break - if match is None: - # Partial match - for det in dets: - if target.lower() in str(det.get("name", "")).lower(): - match = det - break - - if match is None: - print(f"No object matching '{target}'. Available:") - for i, det in enumerate(dets): - print(f" [{i}] {det.get('name', '?')}") - return False - - center = cast("list[float]", match.get("center", [0, 0, 0])) - x, y, z = center[0] + dx, center[1] + dy, center[2] + dz - print( - f"Going to '{match.get('name')}' at " - f"({center[0]:.3f}, {center[1]:.3f}, {center[2]:.3f}) " - f"+ offset ({dx}, {dy}, {dz})" - ) - return self.plan_pose(x, y, z, robot_name=robot_name) - - # ========================================================================= - # Utility Methods - # ========================================================================= - - def collision(self, joints: list[float], robot_name: str | None = None) -> bool: - """Check if joint config is collision-free. - - Args: - joints: Joint configuration to check - robot_name: Robot to check (required if multiple robots configured) - """ - return cast("bool", self._call("is_collision_free", joints, robot_name)) - - def reset(self) -> bool: - """Reset to IDLE state.""" - return cast("bool", self._call("reset")) - - def cancel(self) -> bool: - """Cancel current motion.""" - return cast("bool", self._call("cancel")) - - def status(self, robot_name: str | None = None) -> dict[str, object] | None: - """Get trajectory execution status.""" - return cast("dict[str, object] | None", self._call("get_trajectory_status", robot_name)) - - def stop(self) -> None: - """Stop RPC client.""" - self._rpc.stop() - - def __repr__(self) -> str: - return f"ManipulationClient(state={self.state()})" - - -def main() -> None: - """Start IPython shell with ManipulationClient.""" - try: - from IPython import embed - except ImportError: - print("IPython not installed. Run: pip install ipython") - return - - c = ManipulationClient() - - # Expose methods directly in user namespace (no 'c.' prefix needed) - user_ns = { - "c": c, - "client": c, - # Query methods - "state": c.state, - "joints": c.joints, - "ee": c.ee, - "url": c.url, - "robots": c.robots, - "info": c.info, - # Planning methods - "plan": c.plan, - "plan_pose": c.plan_pose, - "preview": c.preview, - "execute": c.execute, - "has_plan": c.has_plan, - "clear_plan": c.clear_plan, - # Obstacle methods - "box": c.box, - "sphere": c.sphere, - "cylinder": c.cylinder, - "remove": c.remove, - # Gripper methods - "set_gripper": c.set_gripper, - "get_gripper": c.get_gripper, - "open_gripper": c.open_gripper, - "close_gripper": c.close_gripper, - # Perception methods - "perception": c.perception, - "detections": c.detections, - "obstacles": c.obstacles, - "refresh": c.refresh, - "clear_perception": c.clear_perception, - "goto_object": c.goto_object, - # Utility methods - "collision": c.collision, - "reset": c.reset, - "cancel": c.cancel, - "status": c.status, - } - - banner = """ -Manipulation Client - IPython Interface -======================================== - -Commands (no prefix needed): - joints() # Get joint positions - ee() # Get end-effector pose - url() # Get Meshcat URL - state() # Get manipulation state - robots() # List configured robots - -Planning: - plan([0.1, 0.2, ...]) # Plan to joint config - plan_pose(0.4, 0, 0.3) # Plan to cartesian pose (keeps orientation) - plan_pose(0.4, 0, 0.3, roll=0, pitch=3.14, yaw=0) # With orientation - preview() # Preview path in Meshcat - execute() # Execute via coordinator - -Obstacles: - box("name", x, y, z, width, height, depth) # Add box - box("table", 0.5, 0, -0.02, 1.0, 0.6, 0.04) # Example: table - sphere("name", x, y, z, radius) # Add sphere - cylinder("name", x, y, z, radius, length) # Add cylinder - remove("obstacle_id") # Remove obstacle - -Gripper: - open_gripper() # Open gripper fully - close_gripper() # Close gripper fully - set_gripper(0.05) # Set gripper position (meters) - get_gripper() # Get gripper position (meters) - -Perception: - perception() # Get status (cached/added counts) - detections() # List cached detections - refresh() # Snapshot detections as obstacles - refresh(5) # Only objects seen >= 5 seconds - obstacles() # List obstacles in planning world - clear_perception() # Remove all perception obstacles - goto_object("cup") # Plan to object by name - goto_object(0, dz=0.1) # Plan to object by index with Z offset - -Utility: - collision([0.1, ...]) # Check if config is collision-free - reset() # Reset to IDLE state - cancel() # Cancel current motion - -Type help(command) for details, e.g. help(box) -""" - print(banner) - - try: - embed(user_ns=user_ns, colors="neutral") # type: ignore[no-untyped-call] - finally: - c.stop() - - -if __name__ == "__main__": - main() From 697bc6e8ff0a24b13583f92929bb6bbf1d78792a Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Wed, 11 Feb 2026 17:34:56 -0800 Subject: [PATCH 8/9] =?UTF-8?q?=20typo=20=E2=80=94=20get=5Fstatus=20should?= =?UTF-8?q?=20be=20get=5Fstate.=20Fixed?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dimos/manipulation/control/coordinator_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dimos/manipulation/control/coordinator_client.py b/dimos/manipulation/control/coordinator_client.py index d4c717072..5cdc62a29 100644 --- a/dimos/manipulation/control/coordinator_client.py +++ b/dimos/manipulation/control/coordinator_client.py @@ -122,9 +122,9 @@ def get_joint_positions(self) -> dict[str, float]: def get_trajectory_status(self, task_name: str) -> dict[str, Any]: """Get status of a trajectory task via task_invoke.""" - result = self._rpc.task_invoke(task_name, "get_status", {}) - if isinstance(result, dict): - return result + result = self._rpc.task_invoke(task_name, "get_state", {}) + if result is not None: + return {"state": int(result), "task": task_name} return {} # ========================================================================= From 2a69411e25a8d82691684b7a372f9151ee2c4cc1 Mon Sep 17 00:00:00 2001 From: mustafab0 Date: Wed, 11 Feb 2026 17:35:56 -0800 Subject: [PATCH 9/9] removed PoseArray which was unused as graspgen module is yet to be integrated --- dimos/manipulation/manipulation_module.py | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index 4e48d3a29..2e7bfed50 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -58,7 +58,7 @@ from collections.abc import Generator from dimos.core.rpc_client import RPCClient - from dimos.msgs.geometry_msgs import Pose, PoseArray + from dimos.msgs.geometry_msgs import Pose logger = setup_logger() @@ -1104,22 +1104,13 @@ def _generate_grasps_for_pick( # Try GraspGen via rpc_calls if available try: generate = self.get_rpc_calls("GraspingModule.generate_grasps") - result_str = generate(object_name, object_id, True) - # If generate_grasps returned actual PoseArray via the grasps port, - # we need to get the poses. For now, check if it returned an error string. - if isinstance(result_str, str) and "No" in result_str: - logger.info(f"GraspGen returned: {result_str}, falling back to heuristic") + result = generate(object_name, object_id, True) + if isinstance(result, str) and "No" in result: + logger.info(f"GraspGen returned: {result}, falling back to heuristic") else: - # GraspGen succeeded — get poses from the grasps port or RPC - logger.info(f"GraspGen result: {result_str}") - # Try to get the grasp poses via RPC - try: - get_grasps = self.get_rpc_calls("GraspingModule.get_latest_grasps") - grasp_poses: PoseArray | None = get_grasps() - if grasp_poses and len(grasp_poses.poses) > 0: - return list(grasp_poses.poses) - except Exception: - pass + logger.info(f"GraspGen result: {result}") + # GraspGen publishes to Out[PoseArray] port — grasps arrive via + # stream subscription, not RPC return. For now, fall through to heuristic. except Exception as e: logger.debug(f"GraspGen not available ({e}), using heuristic approach")