Feature: Control Coordinator support for mobile base#1277
Feature: Control Coordinator support for mobile base#1277
Conversation
Greptile SummaryExtends the
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant KB as KeyboardTeleop
participant LCM as LCM /cmd_vel
participant CC as ControlCoordinator
participant VJ as Virtual Joints
participant TL as TickLoop
participant CTB as ConnectedTwistBase
participant Adapter as TwistBaseAdapter
KB->>LCM: Twist(linear, angular)
LCM->>CC: twist_command subscription
CC->>CC: _on_twist_command()
Note over CC: Map Twist fields to virtual joints<br/>base_vx ← linear.x<br/>base_vy ← linear.y<br/>base_wz ← angular.z
CC->>CC: _on_joint_command(JointState)
CC->>VJ: Route to velocity task
loop Every tick (100Hz)
TL->>CTB: read_state()
CTB->>Adapter: read_velocities() + read_odometry()
Adapter-->>CTB: velocities, odometry
CTB-->>TL: {joint: JointState}
TL->>TL: Arbitrate (per-joint, priority)
TL->>CTB: write_command(velocities, mode)
CTB->>Adapter: write_velocities([vx, vy, wz])
end
Last reviewed commit: d62d44e |
| if hasattr(module, "register"): | ||
| module.register(self) | ||
| except ImportError as e: | ||
| logger.debug(f"Skipping twist base adapter {name}: {e}") |
There was a problem hiding this comment.
debug is hidden by default. It should be at least a warning, but even if you change it to a warning, why skip over broken adapters?
| coord = coordinator_mock_twist_base.build() | ||
|
|
||
| # Build keyboard teleop with LCM transport on /cmd_vel | ||
| teleop = ( | ||
| keyboard_teleop() | ||
| .transports( | ||
| { | ||
| ("cmd_vel", Twist): LCMTransport("/cmd_vel", Twist), | ||
| } | ||
| ) | ||
| .build() |
There was a problem hiding this comment.
Why build two blueprints instead of just having one?
| try: | ||
| coord.loop() | ||
| except KeyboardInterrupt: | ||
| pass |
There was a problem hiding this comment.
This seems vibed. :) loop handles KeyboardInterrupt so it would never be received there.
(Also, loop calls stop when KeyboardInterrupt is received, so you don't have to call it too.)
| # Mock holonomic twist base (3-DOF: vx, vy, wz) | ||
| _base_joints = make_twist_base_joints("base") | ||
| coordinator_mock_twist_base = control_coordinator( | ||
| tick_rate=100.0, |
There was a problem hiding this comment.
I've looked at the codebase, and every since instance of control_coordinator sets tick_rate to 100.0. But that's already the default, so you don't need to set it. joint_state_frame_id is always "coordinator".
I think blueprints should only specify what is different. Many of these values are the default so they shouldn't be mentioned at all.
| component: HardwareComponent, | ||
| ) -> bool: | ||
| """Register a hardware adapter with the coordinator.""" | ||
| from dimos.hardware.drive_trains.spec import TwistBaseAdapter as TwistBaseAdapterProto |
There was a problem hiding this comment.
Can't this be at the top?
| adapter=adapter, # type: ignore[arg-type] | ||
| component=component, | ||
| ) | ||
| else: | ||
| connected = ConnectedHardware( | ||
| adapter=adapter, # type: ignore[arg-type] |
There was a problem hiding this comment.
Why # type: ignore[arg-type]
| if not isinstance(adapter, TwistBaseAdapterProto): | ||
| raise TypeError("adapter must implement TwistBaseAdapter") | ||
|
|
||
| self._adapter: TwistBaseAdapter = adapter # type: ignore[assignment] |
There was a problem hiding this comment.
Please fix # type: ignore[assignment]
| self._current_mode: ControlMode | None = None | ||
|
|
||
| @property | ||
| def adapter(self) -> TwistBaseAdapter: # type: ignore[override] |
There was a problem hiding this comment.
Please fix type: ignore
| def connect(self) -> bool: | ||
| """Connect to FlowBase controller via Portal RPC.""" | ||
| try: | ||
| import portal # type: ignore[import-not-found] |
There was a problem hiding this comment.
What is this? It's not in pyproject.toml.
|
|
||
| def read_velocities(self) -> list[float]: | ||
| """Return last commanded velocities (FlowBase doesn't report actual).""" | ||
| return self._last_velocities.copy() |
There was a problem hiding this comment.
I'm not certain, but at a quick glance, it looks like _last_velocities is written to and read from different threads. Does this need with self._lock?
1e88865 to
c595eb7
Compare
Problem
The ControlCoordinator only supports joint-level control (manipulator arms). Mobile bases, quadrupeds, and drones take Twist (velocity) commands, so there's no way to control them through the coordinator — blocking mobile manipulation.
Solution
Virtual joints map velocity DOFs into the coordinator's existing joint-centric model (
base_vx,base_vy,base_wz) A newtwist_command: In[Twist]port converts Twist → virtual joint velocities, feeding the existing routing pipeline.New
TwistBaseAdapterprotocol (10 methods, SI units) provides a lightweight hardware abstraction.ConnectedTwistBaseinheritsConnectedHardwareto keep the tick loop uniform. IncludesMockTwistBaseAdapterfor testing andFlowBaseAdapterfor real holonomic base hardware via Portal RPC.Breaking Changes
None
How to Test
python -m dimos.control.examples.twist_base_keyboard_teleop/cmd_velin another terminal:python -m dimos.control.examples.echo_cmd_velcloses DIM-547
closes DIM-546