Feature: Basic Agentic Pick and Place - reimplemented manipulation skills#1237
Feature: Basic Agentic Pick and Place - reimplemented manipulation skills#1237
Conversation
Greptile OverviewGreptile SummaryThis PR adds an agent-invokable skill layer to manipulation by converting The changes fit into the existing architecture by leaning on Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
autonumber
participant User as Human user
participant HI as human_input (CLI)
participant Agent as llm_agent
participant Skills as SkillCoordinator
participant Manip as ManipulationModule (SkillModule)
participant WM as WorldMonitor
participant Planner as RRTConnectPlanner
participant CC as ControlCoordinator
participant Task as Trajectory Task (JointTrajectoryController)
User->>HI: type "pick up the cup"
HI->>Agent: user message
Agent->>Skills: select skill + args
Skills->>Manip: pick(object_name)
Manip->>WM: refresh_obstacles(min_duration)
WM-->>Manip: obstacles added
Manip->>Manip: _generate_grasps_for_pick()
alt GraspingModule RPC wired
Manip->>Manip: get_rpc_calls("GraspingModule.generate_grasps")
Manip-->>Manip: grasp candidates (intended)
else fallback heuristic
Manip->>WM: list_cached_detections()
WM-->>Manip: detections snapshot
Manip-->>Manip: heuristic grasp pose
end
Manip->>Planner: plan_joint_path(start, goal)
Planner-->>Manip: JointPath
Manip->>Manip: generate JointTrajectory
Manip->>CC: task_invoke(task, "execute", {trajectory})
CC->>Task: execute(trajectory)
Task-->>CC: accepted
CC-->>Manip: result
loop wait for completion
Manip->>CC: task_invoke(task, "get_status" or "get_state")
CC->>Task: get_status()/get_state()
Task-->>CC: TrajectoryStatus / None
CC-->>Manip: status
end
Manip-->>Skills: streamed progress strings
Skills-->>Agent: tool results
Agent-->>HI: assistant response
|
Additional Comments (1)
This switched to |
|
i assume i need hardware to test this |
| # Snapshotted detections from the last scan_objects/refresh call. | ||
| # The live detection cache is volatile (labels change every frame), | ||
| # so pick/place use this stable snapshot instead. | ||
| self._detection_snapshot: list[dict[str, object]] = [] |
There was a problem hiding this comment.
here also idk what object means
There was a problem hiding this comment.
This should have been an Object. A confusion with object vs Object. will fix.
There was a problem hiding this comment.
There's an issue open to rename all of them https://linear.app/dimensional/issue/DIM-452/rename-object-to-something-else-entity-body-etc
There was a problem hiding this comment.
replaced object data type with Any to avoid confusion in the interim.
| def _make_pose( | ||
| self, x: float, y: float, z: float, roll: float, pitch: float, yaw: float | ||
| ) -> Pose: | ||
| """Construct a Pose from position and Euler angles (radians).""" | ||
| from dimos.msgs.geometry_msgs import Pose, Vector3 | ||
| from dimos.utils.transform_utils import euler_to_quaternion | ||
|
|
||
| orientation = euler_to_quaternion(Vector3(roll, pitch, yaw)) | ||
| return Pose(position=Vector3(x, y, z), orientation=orientation) |
There was a problem hiding this comment.
just a helper. removed as unnecesary part of legacy code before type checking.
| def _wait_for_trajectory_completion( | ||
| self, robot_name: RobotName | None = None, timeout: float = 60.0, poll_interval: float = 0.2 | ||
| ) -> bool: | ||
| """Wait for trajectory execution to complete. | ||
|
|
||
| Polls the coordinator task state via task_invoke. Falls back to waiting | ||
| for the trajectory duration if the coordinator is unavailable. | ||
|
|
||
| Args: | ||
| robot_name: Robot to monitor | ||
| timeout: Maximum wait time in seconds | ||
| poll_interval: Time between status checks | ||
|
|
||
| Returns: | ||
| True if trajectory completed successfully | ||
| """ | ||
| robot = self._get_robot(robot_name) | ||
| if robot is None: | ||
| return True | ||
| rname, _, config, _ = robot | ||
| client = self._get_coordinator_client() | ||
|
|
||
| if client is None or not config.coordinator_task_name: | ||
| # No coordinator — wait for trajectory duration as fallback | ||
| traj = self._planned_trajectories.get(rname) | ||
| if traj is not None: | ||
| logger.info(f"No coordinator status — waiting {traj.duration:.1f}s for trajectory") | ||
| time.sleep(traj.duration + 0.5) | ||
| return True |
There was a problem hiding this comment.
This should be handled by @paul-nechifor skill coordinator stuff. We should have a universal and Generic set of "Skills" or "Processes" within dimOS and all governed by the same coordinator and manager. Fine for now since not done yet.
- dimOS skills are functions that agents can call
- dimOS processes are short-mid-long running tasks typically triggered by skill calls (or directly)
There was a problem hiding this comment.
The plan is (discussed with @leshy ) to expose everything as an MCP server. So the tool coordination will happen through the MCP client. The MCP client can be claude code, but we'll also write our own client.
The tools/resources (replacing the skill terminology) will be defined the way MCP expects them. But we can have helper classes for running long running tools in our code.
Multiple async tool calls can happen through MCP and it supports updates from the server to the client.
Long running tools calls will push a stream of updates, like generator skills. This is a little more complex in MCP, but we'll have helper code to do this.
| def pick( | ||
| self, | ||
| object_name: str, | ||
| object_id: str | None = None, | ||
| robot_name: str | None = None, | ||
| ) -> Generator[str, None, None]: | ||
| """Pick up an object by name using grasp planning and motion execution. | ||
|
|
||
| Scans the scene, generates grasp poses (via GraspGen or heuristic fallback), | ||
| plans collision-free approach/grasp/retract motions, and executes them. | ||
|
|
||
| Args: | ||
| object_name: Name of the object to pick (e.g. "cup", "bottle", "can"). | ||
| object_id: Optional unique object ID from perception for precise identification. |
There was a problem hiding this comment.
@paul-nechifor Flagging for you since skills like this are helpful to see what we're solving for on agents
You only need a realsense camera to test this PR. All the robot hardware can be simulated using the coordinator-mock blueprint |
2a69411 to
e869963
Compare
| # Hardware SDKs | ||
| "piper-sdk", | ||
| "xarm-python-sdk>=1.17.0", | ||
|
|
There was a problem hiding this comment.
Should you add something like this pyrealsense2>=2.50.0 dependency here or in perception part?
9631ef1 to
be8d1c0
Compare
7d68e3f
…lues have tiny errors that can get out of joint limit range
… dimos Object Datatype
45639ef to
741d6c2
Compare
17fd15f to
65df8d8
Compare
Problem
ManipulationModule had no skill layer — only low-level RPCs requiring a custom IPython client.
No way for an LLM agent to invoke pick/place/move skills through the framework.
RRT planner rejected valid joint states due to floating-point drift past URDF limits.
Solution
Added 11
@skill()methods to ManipulationModule (pick, place, move_to_pose, scan_objects, etc.).Changed base class to
SkillModuleso skills auto-register with agents viaautoconnect().Created
xarm-perception-agentblueprint composing xarm_perception + llm_agent + human_input.Added detection snapshot so pick uses stable labels instead of volatile live cache.
Added
limit_eps=1e-3tolerance to RRT planner joint limit validation.Removed ManipulationClient and
run_*RPC wrappers — agent CLI replaces them.CoordinatorClient updated to route execution through
task_invokeinstead of removed RPCs.Breaking Changes
ManipulationClientdeleted — usedimos run xarm-perception-agentor direct RPC.run_pick/run_place/run_place_back/run_go_initRPCs removed from ManipulationModule.How to Test
dimos run coordinator-mockthendimos run xarm-perception-agentscan the scene— verify objects listed with 3D positionspick up the <object>— verify approach, grasp, retract sequenceplace it back— verify placement at original pick positiondimos agentspycloses DIM-351
closes DIM-419