-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathso101_controller.py
More file actions
287 lines (248 loc) · 10.5 KB
/
so101_controller.py
File metadata and controls
287 lines (248 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
#!/usr/bin/env python3
"""SO101 follower arm controller.
Controls the physical SO101 (SO-ARM100) follower arm over USB via the
Feetech bus. Joint angles are in degrees; gripper is 0–1 (open).
No AgileX or Piper references; SO101 only.
"""
import sys
import threading
import time
from pathlib import Path
from typing import Any
import numpy as np
# Make examples/common importable from the repo root
sys.path.insert(0, str(Path(__file__).resolve().parent / "examples"))
from common.sts3215_bus import SO101FollowerDriver # noqa: E402
# SO101 motor keys in order: 5 body joints + gripper
SO101_JOINT_KEYS = [
"shoulder_pan.pos",
"shoulder_lift.pos",
"elbow_flex.pos",
"wrist_flex.pos",
"wrist_roll.pos",
]
SO101_GRIPPER_KEY = "gripper.pos"
NUM_BODY_JOINTS = 5
class SO101Controller:
"""Controller for the SO101 follower arm (SO-ARM100).
Presents the same logical interface as the previous robot controller:
joint angles in degrees (5 body joints), gripper 0–1, enable/disable,
home, and a background control loop that sends commands when enabled.
"""
def __init__(
self,
port: str = "/dev/ttyACM0",
follower_id: str = "my_awesome_follower_arm",
robot_rate: float = 100.0,
neutral_joint_angles: np.ndarray | None = None,
debug_mode: bool = False,
) -> None:
"""Initialize the SO101 controller (does not connect yet).
Args:
port: Serial port for the SO101 follower (e.g. /dev/ttyACM0 or /dev/ttyUSB0).
follower_id: Calibration id used for this follower (lerobot-setup-motors / calibration).
robot_rate: Control loop rate in Hz.
neutral_joint_angles: Home position [j1..j5] in degrees (default zeros).
debug_mode: Enable debug logging.
"""
self.port = port
self.follower_id = follower_id
self.robot_rate = robot_rate
self.debug_mode = debug_mode
self._robot: SO101FollowerDriver | None = None
self.position_lock = threading.Lock()
self.state_lock = threading.Lock()
self._bus_lock = threading.Lock() # Serialize all serial port access (read/write)
self.running = threading.Event()
self.running.set()
self._robot_enabled = False
if neutral_joint_angles is not None:
self.HOME_JOINT_ANGLES = np.array(neutral_joint_angles, dtype=np.float64)
else:
self.HOME_JOINT_ANGLES = np.zeros(NUM_BODY_JOINTS, dtype=np.float64)
# Gripper: 0 = closed, 1 = open (normalized). SO101 uses 0–100 in action.
self._target_joint_angles = self.HOME_JOINT_ANGLES.copy()
self._gripper_open_value = 0.5 # 0–1
# Joint limits (degrees) – conservative defaults; can be updated from calibration
self.JOINT_LIMITS = np.array(
[
(-150.0, 150.0),
(-180.0, 180.0),
(-150.0, 150.0),
(-150.0, 150.0),
(-180.0, 180.0),
],
dtype=np.float64,
)
self._control_loop_thread = threading.Thread(
target=self._control_loop, daemon=True
)
self._connect()
def _connect(self) -> None:
"""Connect to the SO101 follower arm."""
print(f"Connecting to SO101 follower on {self.port} (id={self.follower_id})...")
self._robot = SO101FollowerDriver(port=self.port, follower_id=self.follower_id)
self._robot.connect()
print("✓ SO101 follower connected")
def start_control_loop(self) -> None:
"""Start the control loop thread."""
self._control_loop_thread.start()
def stop_control_loop(self) -> None:
"""Stop the control loop thread."""
self.running.clear()
if self._control_loop_thread.is_alive():
self._control_loop_thread.join(timeout=2.0)
print("✓ Control loop thread joined")
else:
print("Control loop thread is not running")
def __del__(self) -> None:
self.cleanup()
def cleanup(self) -> None:
"""Disconnect and release resources."""
print("🧹 Cleaning up SO101 controller...")
self.stop_control_loop()
self._set_robot_enabled(False)
if self._robot is not None and self._robot.is_connected:
self._robot.disconnect()
print("✓ SO101 follower disconnected")
self._robot = None
print("✓ SO101 controller cleanup completed")
def _set_robot_enabled(self, enabled: bool) -> None:
with self.state_lock:
self._robot_enabled = enabled
if self.debug_mode:
print(f"SO101 enabled: {enabled}")
def is_robot_enabled(self) -> bool:
with self.state_lock:
return self._robot_enabled
def get_target_joint_angles(self) -> np.ndarray:
"""Target body joint angles [j1..j5] in degrees."""
with self.position_lock:
return self._target_joint_angles.copy()
def set_target_joint_angles(self, joint_angles: np.ndarray) -> None:
"""Set target body joint angles [j1..j5] in degrees."""
with self.position_lock:
angles = np.array(joint_angles, dtype=np.float64)
if angles.size >= NUM_BODY_JOINTS:
angles = angles[:NUM_BODY_JOINTS]
else:
angles = np.resize(angles, NUM_BODY_JOINTS)
self._target_joint_angles = np.clip(
angles,
self.JOINT_LIMITS[:, 0],
self.JOINT_LIMITS[:, 1],
)
def get_gripper_open_value(self) -> float:
"""Gripper open value in [0, 1]."""
with self.position_lock:
return float(np.clip(self._gripper_open_value, 0.0, 1.0))
def set_gripper_open_value(self, value: float) -> None:
"""Set gripper open value in [0, 1]."""
with self.position_lock:
self._gripper_open_value = float(np.clip(value, 0.0, 1.0))
def get_current_joint_angles(self) -> np.ndarray | None:
"""Current body joint angles [j1..j5] in degrees, or None if not connected."""
if self._robot is None or not self._robot.is_connected:
return None
try:
with self._bus_lock:
obs = self._robot.get_observation()
angles = np.array(
[obs[k] for k in SO101_JOINT_KEYS if k in obs],
dtype=np.float64,
)
if angles.size >= NUM_BODY_JOINTS:
return angles[:NUM_BODY_JOINTS]
return None
except Exception as e:
if self.debug_mode:
print(f"get_current_joint_angles: {e}")
return None
def get_current_gripper_open_value(self) -> float | None:
"""Current gripper open value in [0, 1], or None."""
if self._robot is None or not self._robot.is_connected:
return None
try:
with self._bus_lock:
obs = self._robot.get_observation()
raw = obs.get(SO101_GRIPPER_KEY, 50.0)
return float(np.clip(raw / 100.0, 0.0, 1.0))
except Exception as e:
if self.debug_mode:
print(f"get_current_gripper_open_value: {e}")
return None
def _action_from_targets(self) -> dict[str, float]:
"""Build action dict from current targets (keys like shoulder_pan.pos, gripper.pos)."""
with self.position_lock:
j = self._target_joint_angles
g = self._gripper_open_value * 100.0
action = {}
for i, key in enumerate(SO101_JOINT_KEYS):
action[key] = float(j[i])
action[SO101_GRIPPER_KEY] = float(g)
return action
def _control_loop(self) -> None:
"""Send target positions to the robot at robot_rate Hz when enabled."""
period = 1.0 / self.robot_rate
while self.running.is_set():
try:
if self.is_robot_enabled() and self._robot is not None and self._robot.is_connected:
action = self._action_from_targets()
with self._bus_lock:
self._robot.send_action(action)
time.sleep(period)
except Exception as e:
print(f"SO101 control loop error: {e}")
time.sleep(0.01)
def move_to_home(self) -> bool:
"""Set target to home joint angles (and leave gripper as-is)."""
try:
with self.position_lock:
self._target_joint_angles = self.HOME_JOINT_ANGLES.copy()
print("✓ Home position set")
return True
except Exception as e:
print(f"✗ Home error: {e}")
return False
def graceful_stop(self) -> bool:
"""Stop sending commands and mark robot disabled."""
try:
print("🛑 Graceful stop – disabling SO101 command stream")
self._set_robot_enabled(False)
return True
except Exception as e:
print(f"✗ Graceful stop error: {e}")
return False
def resume_robot(self) -> bool:
"""Re-enable and optionally sync targets to current state."""
try:
if self.is_robot_enabled():
return True
self._set_robot_enabled(True)
# Optionally align targets to current to avoid jump
cur = self.get_current_joint_angles()
if cur is not None and len(cur) >= NUM_BODY_JOINTS:
self.set_target_joint_angles(cur)
g = self.get_current_gripper_open_value()
if g is not None:
self.set_gripper_open_value(g)
print("✓ SO101 resumed")
return True
except Exception as e:
print(f"✗ Resume error: {e}")
return False
def is_robot_homed(self, tolerance_degrees: float = 2.0) -> bool:
"""True if current joint angles are near home."""
cur = self.get_current_joint_angles()
if cur is None or len(cur) < NUM_BODY_JOINTS:
return False
return bool(np.all(np.abs(cur - self.HOME_JOINT_ANGLES) < tolerance_degrees))
def get_robot_status(self) -> dict[str, Any]:
"""Status dict for debugging."""
return {
"enabled": self.is_robot_enabled(),
"target_joint_angles": self.get_target_joint_angles(),
"gripper_open_value": self.get_gripper_open_value(),
"current_joint_angles": self.get_current_joint_angles(),
"current_gripper_open_value": self.get_current_gripper_open_value(),
}