diff --git a/hci/user-in-the-box/uitb/scripts/arcade_drive_teleop.py b/hci/user-in-the-box/uitb/scripts/arcade_drive_teleop.py new file mode 100644 index 00000000..50c336b2 --- /dev/null +++ b/hci/user-in-the-box/uitb/scripts/arcade_drive_teleop.py @@ -0,0 +1,427 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""遥控驾驶任务专用:姿态保持 + 键盘微调的辅助遥控。 + +仅适用于 task 为 RemoteDriving 的仿真。这个版本不再把 WASD 直接映射到 26 维肌肉增量, +而是使用一个“辅助姿态”: + +1. 零输入时,把手臂拉回离摇杆很近的参考姿态,避免 reset 后随机姿态/肌肉导致手臂自己漂。 +2. W/S 直接给拇指杆一个小幅辅助偏转,保证车真的会动。 +3. A/D 只对手臂做横向微调,用于演示“人手正在操控摇杆”。 +""" +from __future__ import annotations + +import argparse +import ctypes +import os +import sys + +# 直接运行 `python uitb/scripts/本脚本.py` 时,sys.path 不含仓库根目录,会导致 import uitb 失败 +_REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if _REPO_ROOT not in sys.path: + sys.path.insert(0, _REPO_ROOT) + +import mujoco +import numpy as np +import pygame + +from uitb.simulator import Simulator +from uitb.utils import interaction_metrics + +_REFERENCE_ARM_QPOS = np.array( + [0.73238167, 1.17101506, 0.27191627, 0.98772476, 1.08483110], + dtype=np.float32, +) +_PUSH_QPOS_VECTOR = np.array([0.03, -0.06, 0.01, -0.08, 0.0], dtype=np.float32) +_LATERAL_QPOS_VECTOR = np.array([0.00, 0.02, 0.12, 0.00, 0.05], dtype=np.float32) +_JOYSTICK_LIMIT = 0.30 + +_IS_WINDOWS = os.name == "nt" +_USER32 = ctypes.windll.user32 if _IS_WINDOWS else None + +_VK_W = 0x57 +_VK_A = 0x41 +_VK_S = 0x53 +_VK_D = 0x44 +_VK_UP = 0x26 +_VK_DOWN = 0x28 +_VK_LEFT = 0x25 +_VK_RIGHT = 0x27 +_VK_HOME = 0x24 +_VK_R = 0x52 +_VK_ESC = 0x1B +_VK_SHIFT = 0x10 + + +def _independent_joint_ranges(sim: Simulator) -> tuple[np.ndarray, np.ndarray]: + jidx = np.asarray(sim.bm_model._independent_joints, dtype=np.int32) + rng = sim._model.jnt_range[jidx].astype(np.float32) + return rng[:, 0], rng[:, 1] + + +def _apply_assisted_pose(sim: Simulator, arm_qpos: np.ndarray, joystick_q: float) -> None: + arm_qpos_idx = sim.bm_model._independent_qpos + arm_dof_idx = sim.bm_model._independent_dofs + muscles = sim.bm_model._muscle_actuators + + sim._data.qpos[arm_qpos_idx] = arm_qpos + sim._data.qvel[arm_dof_idx] = 0.0 + sim._data.act[muscles] = 0.0 + + sim._data.joint("thumb-stick-1:rot-x").qpos[0] = joystick_q + sim._data.joint("thumb-stick-1:rot-x").qvel[0] = 0.0 + sim._data.joint("thumb-stick-2:rot-x").qpos[0] = 0.0 + sim._data.joint("thumb-stick-2:rot-x").qvel[0] = 0.0 + + mujoco.mj_forward(sim._model, sim._data) + + +def _reset_episode_to_reference(sim: Simulator, arm_qpos_ref: np.ndarray) -> None: + _, _ = sim.reset() + + # 保留任务随机生成的车/目标位置,只把手臂与摇杆复位到可控的参考状态。 + car_q = float(sim._data.joint("car").qpos[0]) + target_q = float(sim._data.joint("target").qpos[0]) + + sim._data.qpos[:] = 0.0 + sim._data.qvel[:] = 0.0 + if sim._data.act is not None and sim._data.act.size > 0: + sim._data.act[:] = 0.0 + + sim._data.joint("car").qpos[0] = car_q + sim._data.joint("target").qpos[0] = target_q + _apply_assisted_pose(sim, arm_qpos_ref, joystick_q=0.0) + + +def _draw_hud(surface: pygame.Surface, lines: list[str]) -> None: + try: + font = pygame.font.SysFont("consolas", 16) + except Exception: + font = pygame.font.Font(None, 18) + y = 6 + for txt in lines: + rendered = font.render(txt, True, (255, 255, 255)) + bg = pygame.Surface((rendered.get_width() + 6, rendered.get_height() + 2), pygame.SRCALPHA) + bg.fill((0, 0, 0, 140)) + surface.blit(bg, (4, y)) + surface.blit(rendered, (7, y + 1)) + y += rendered.get_height() + 3 + + +def _joystick_bar(value: float, limit: float, width: int = 20) -> str: + frac = value / limit if abs(limit) > 1e-8 else 0.0 + frac = max(-1.0, min(1.0, frac)) + mid = width // 2 + pos = int(round(frac * mid)) + bar = ["-"] * width + bar[mid] = "|" + idx = mid + pos + idx = max(0, min(width - 1, idx)) + bar[idx] = "#" + return "[" + "".join(bar) + "]" + + +def _win_key_pressed(vk_code: int) -> bool: + if not _IS_WINDOWS or _USER32 is None: + return False + return bool(_USER32.GetAsyncKeyState(vk_code) & 0x8000) + + +def _parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description="Assistive WASD teleop for RemoteDriving.") + p.add_argument("simulator_folder", type=str) + p.add_argument("--uncloned", dest="cloned", action="store_false") + p.add_argument("--step", type=float, default=0.035, help="每帧摇杆目标角增量(弧度)") + p.add_argument("--return-rate", type=float, default=0.30, help="松手时摇杆每帧回中比例") + p.add_argument("--lateral-step", type=float, default=0.08, help="A/D 手臂横向偏置增量(归一化尺度)") + p.add_argument("--lateral-return", type=float, default=0.35, help="松开 A/D 后横向偏置每帧衰减比例") + p.add_argument("--csv", type=str, default=None, help="导出指标 CSV 路径") + p.add_argument( + "--layout", + choices=("wasd", "arrows"), + default="wasd", + help="wasd 或 方向键", + ) + return p.parse_args() + + +def main() -> int: + args = _parse_args() + if not os.path.isdir(args.simulator_folder): + print(f"目录不存在: {args.simulator_folder}", file=sys.stderr) + return 1 + + sim = Simulator.get( + args.simulator_folder, + render_mode="human", + render_mode_perception="embed", + run_parameters={"evaluate": True}, + use_cloned=args.cloned, + ) + + if sim.task.__class__.__name__ != "RemoteDriving": + print("本脚本仅用于 RemoteDriving 任务(遥控驾驶 + 拇指杆)。当前任务:" + f"{sim.task.__class__.__name__}", file=sys.stderr) + sim.close() + return 1 + + if "llc" in sim.config: + print("当前仿真启用了 HRL/LLC,不支持本脚本。", file=sys.stderr) + sim.close() + return 1 + + nu_total = int(sim.action_space.shape[0]) + if len(sim.bm_model._independent_qpos) != len(_REFERENCE_ARM_QPOS): + print("独立关节数量与参考姿态不匹配,无法启用辅助姿态遥控。", file=sys.stderr) + sim.close() + return 1 + + arm_lo, arm_hi = _independent_joint_ranges(sim) + arm_qpos_ref = np.clip(_REFERENCE_ARM_QPOS.copy(), arm_lo, arm_hi) + + if args.layout == "wasd": + K_F, K_B, K_L, K_R = pygame.K_w, pygame.K_s, pygame.K_a, pygame.K_d + else: + K_F, K_B, K_L, K_R = pygame.K_UP, pygame.K_DOWN, pygame.K_LEFT, pygame.K_RIGHT + + print( + "Arcade 遥控(辅助姿态):W/S 推拉摇杆 A/D 手臂横移 R 重置局 Home 回中 ESC 退出" + f" | 回中={args.return_rate:.2f}/步" + ) + print("请先点击弹出的仿真窗口使其获得焦点,再按键(否则 WASD 可能无响应)。") + + _reset_episode_to_reference(sim, arm_qpos_ref) + if sim._render_window is not None: + try: + pygame.key.set_repeat(0, 0) + except pygame.error: + pass + + action = np.zeros(nu_total, dtype=np.float32) + joystick_cmd = 0.0 + lateral_cmd = 0.0 + running = True + repeat_set = False + key_state = {"fwd": False, "back": False, "left": False, "right": False, "home": False, "reset": False, "esc": False} + last_key_event = "-" + ep_car_dists: list[float] = [] + all_episodes: list[dict] = [] + episode_idx = 0 + ep_reward = 0.0 + + try: + while running: + pygame.event.pump() + for event in pygame.event.get(): + if event.type == pygame.QUIT: + running = False + elif not _IS_WINDOWS and event.type == pygame.KEYDOWN: + if event.key in (pygame.K_w, pygame.K_UP): + key_state["fwd"] = True + last_key_event = "KEYDOWN:FWD" + elif event.key in (pygame.K_s, pygame.K_DOWN): + key_state["back"] = True + last_key_event = "KEYDOWN:BACK" + elif event.key in (pygame.K_a, pygame.K_LEFT): + key_state["left"] = True + last_key_event = "KEYDOWN:LEFT" + elif event.key in (pygame.K_d, pygame.K_RIGHT): + key_state["right"] = True + last_key_event = "KEYDOWN:RIGHT" + elif event.key == pygame.K_HOME: + key_state["home"] = True + last_key_event = "KEYDOWN:HOME" + if event.key == pygame.K_ESCAPE: + running = False + elif event.key == pygame.K_r: + key_state["reset"] = True + last_key_event = "KEYDOWN:RESET" + elif not _IS_WINDOWS and event.type == pygame.KEYUP: + if event.key in (pygame.K_w, pygame.K_UP): + key_state["fwd"] = False + last_key_event = "KEYUP:FWD" + elif event.key in (pygame.K_s, pygame.K_DOWN): + key_state["back"] = False + last_key_event = "KEYUP:BACK" + elif event.key in (pygame.K_a, pygame.K_LEFT): + key_state["left"] = False + last_key_event = "KEYUP:LEFT" + elif event.key in (pygame.K_d, pygame.K_RIGHT): + key_state["right"] = False + last_key_event = "KEYUP:RIGHT" + elif event.key == pygame.K_HOME: + key_state["home"] = False + last_key_event = "KEYUP:HOME" + + speed_mul = 1.6 if (pygame.key.get_mods() & pygame.KMOD_SHIFT) else 1.0 + joy_step = args.step * speed_mul + lat_step = args.lateral_step * speed_mul + + if not repeat_set and sim._render_window is not None: + try: + pygame.key.set_repeat(180, 35) + except pygame.error: + pass + repeat_set = True + + if _IS_WINDOWS: + shift_pressed = _win_key_pressed(_VK_SHIFT) + speed_mul = 1.6 if shift_pressed else 1.0 + joy_step = args.step * speed_mul + lat_step = args.lateral_step * speed_mul + + fwd_pressed = _win_key_pressed(_VK_W) or _win_key_pressed(_VK_UP) + back_pressed = _win_key_pressed(_VK_S) or _win_key_pressed(_VK_DOWN) + left_pressed = _win_key_pressed(_VK_A) or _win_key_pressed(_VK_LEFT) + right_pressed = _win_key_pressed(_VK_D) or _win_key_pressed(_VK_RIGHT) + home_pressed = _win_key_pressed(_VK_HOME) + reset_pressed = _win_key_pressed(_VK_R) + esc_pressed = _win_key_pressed(_VK_ESC) + + key_state["fwd"] = fwd_pressed + key_state["back"] = back_pressed + key_state["left"] = left_pressed + key_state["right"] = right_pressed + key_state["home"] = home_pressed + key_state["reset"] = reset_pressed + key_state["esc"] = esc_pressed + + pressed_names: list[str] = [] + if fwd_pressed: + pressed_names.append("FWD") + if back_pressed: + pressed_names.append("BACK") + if left_pressed: + pressed_names.append("LEFT") + if right_pressed: + pressed_names.append("RIGHT") + if home_pressed: + pressed_names.append("HOME") + if reset_pressed: + pressed_names.append("RESET") + if esc_pressed: + pressed_names.append("ESC") + last_key_event = "WIN32:" + (",".join(pressed_names) if pressed_names else "-") + else: + fwd_pressed = key_state["fwd"] + back_pressed = key_state["back"] + left_pressed = key_state["left"] + right_pressed = key_state["right"] + home_pressed = key_state["home"] + reset_pressed = key_state["reset"] + esc_pressed = key_state["esc"] + + if esc_pressed: + running = False + continue + + if reset_pressed: + if ep_car_dists: + all_episodes.append({"task": "RemoteDriving", "dist_car_samples": list(ep_car_dists), + "target_parking_success": False, "ep_reward": ep_reward}) + ep_car_dists, ep_reward = [], 0.0 + joystick_cmd = 0.0 + lateral_cmd = 0.0 + _reset_episode_to_reference(sim, arm_qpos_ref) + episode_idx += 1 + if not _IS_WINDOWS: + key_state["reset"] = False + continue + + if home_pressed: + joystick_cmd = 0.0 + lateral_cmd = 0.0 + else: + if fwd_pressed: + joystick_cmd -= joy_step + if back_pressed: + joystick_cmd += joy_step + if not (fwd_pressed or back_pressed): + joystick_cmd *= max(0.0, 1.0 - args.return_rate) + + if left_pressed: + lateral_cmd -= lat_step + if right_pressed: + lateral_cmd += lat_step + if not (left_pressed or right_pressed): + lateral_cmd *= max(0.0, 1.0 - args.lateral_return) + + joystick_cmd = float(np.clip(joystick_cmd, -_JOYSTICK_LIMIT, _JOYSTICK_LIMIT)) + lateral_cmd = float(np.clip(lateral_cmd, -1.0, 1.0)) + + push_scale = joystick_cmd / _JOYSTICK_LIMIT if _JOYSTICK_LIMIT > 1e-8 else 0.0 + arm_qpos = arm_qpos_ref + push_scale * _PUSH_QPOS_VECTOR + lateral_cmd * _LATERAL_QPOS_VECTOR + arm_qpos = np.clip(arm_qpos, arm_lo, arm_hi) + _apply_assisted_pose(sim, arm_qpos, joystick_cmd) + + _obs, reward, terminated, truncated, _info = sim.step(action) + ep_reward += reward + + jpos = float(sim._data.joint("thumb-stick-1:rot-x").qpos[0]) + state = sim.get_state() + d_car = float(state.get("dist_car_to_target", float("nan"))) + inside_target = bool(state.get("inside_target", False)) + success_now = bool( + state.get("target_parking_success", False) or state.get("target_hit", False) + ) + if d_car == d_car: + ep_car_dists.append(d_car) + + focus_ok = bool(pygame.key.get_focused()) + keys_now: list[str] = [] + if fwd_pressed: + keys_now.append("FWD") + if back_pressed: + keys_now.append("BACK") + if left_pressed: + keys_now.append("LEFT") + if right_pressed: + keys_now.append("RIGHT") + if home_pressed: + keys_now.append("HOME") + if reset_pressed: + keys_now.append("RESET") + if esc_pressed: + keys_now.append("ESC") + key_text = ",".join(keys_now) if keys_now else "-" + + bar = _joystick_bar(joystick_cmd, _JOYSTICK_LIMIT) + hud = [ + f"Ep {episode_idx} | stick {bar} q={jpos:+.3f}", + f"car_dist={d_car:.3f} lat={lateral_cmd:+.2f} r={reward:.3f}", + f"inside_target={inside_target} success={success_now}", + f"focus={'YES' if focus_ok else 'NO '} keys={key_text}", + f"last={last_key_event}", + f"W/S push | A/D lateral | Home reset | R new | ESC quit", + ] + + if sim._render_window is not None: + _draw_hud(pygame.display.get_surface(), hud) + pygame.display.flip() + + if terminated or truncated: + # RemoteDriving 的 get_state 里是 target_hit;evaluator 里映射为 target_parking_success + success = bool( + state.get("target_parking_success", False) or state.get("target_hit", False) + ) + if ep_car_dists: + all_episodes.append({"task": "RemoteDriving", "dist_car_samples": list(ep_car_dists), + "target_parking_success": success, "ep_reward": ep_reward}) + ep_car_dists, ep_reward = [], 0.0 + joystick_cmd = 0.0 + lateral_cmd = 0.0 + _reset_episode_to_reference(sim, arm_qpos_ref) + episode_idx += 1 + finally: + if ep_car_dists: + all_episodes.append({"task": "RemoteDriving", "dist_car_samples": list(ep_car_dists), + "target_parking_success": False, "ep_reward": ep_reward}) + if all_episodes: + interaction_metrics.print_run_summary("RemoteDriving", all_episodes, csv_path=args.csv) + sim.close() + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/hci/user-in-the-box/uitb/scripts/grad_hmi_demo.py b/hci/user-in-the-box/uitb/scripts/grad_hmi_demo.py new file mode 100644 index 00000000..c9f133b2 --- /dev/null +++ b/hci/user-in-the-box/uitb/scripts/grad_hmi_demo.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""毕业设计:肌肉骨骼手臂人机交互三场景说明与评测辅助。 + +场景 A — 视觉引导的「类方向盘」操控:遥控驾驶任务(拇指杆 + 小车停入目标区),配置见 + uitb/configs/mobl_arms_index_remote_driving.yaml + +场景 B — 控制精度:指向 / 跟踪任务,使用评测脚本 --metrics + +场景 C — 小物体触碰:SmallObjectTouch 任务,配置见 + uitb/configs/grad_small_object_touch.yaml + +首次使用需用 Simulator.build 生成仿真包目录,再训练或运行评测。 + +键盘实时操控(肌肉通道增量,非策略网络): + python uitb/scripts/keyboard_teleop.py <仿真目录> +遥控驾驶 WASD 类「前推后拉 / 左右」近似映射: + python uitb/scripts/arcade_drive_teleop.py + +一键三场景:python uitb/scripts/hmi_quickstart.py + 参数 1=遥控WASD 2=精度评测 3=小物体触碰键盘(可加 --eval 改为仅评测) +""" +from __future__ import annotations + +import argparse +import os +import subprocess +import sys + + +def _repo_root() -> str: + # uitb/scripts/grad_hmi_demo.py -> repo root + return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) + + +def _config(name: str) -> str: + return os.path.join(_repo_root(), "uitb", "configs", name) + + +CONFIGS = { + "steering": _config("mobl_arms_index_remote_driving.yaml"), + "touch": _config("grad_small_object_touch.yaml"), + "precision": _config("mobl_arms_index_pointing_precision.yaml"), +} + + +def _print_build_help() -> None: + print("在仓库根目录执行:\n") + for name, cfg in CONFIGS.items(): + print(f" [{name}]") + print(f" python -c \"from uitb.simulator import Simulator; Simulator.build(r'{cfg}')\"") + print("\n生成目录默认在 uitb/simulators//(或由 config 里的 simulator_folder 指定)。") + + +def _run_eval(sim_folder: str, episodes: int, human: bool, record: bool) -> int: + cmd = [ + sys.executable, "-m", "uitb.test.evaluator", + sim_folder, + "--metrics", + "--deterministic", + "--num_episodes", str(episodes), + ] + if human: + cmd.append("--human") + if record: + cmd.append("--record") + print("运行:", " ".join(cmd)) + return subprocess.call(cmd, cwd=_repo_root()) + + +def main() -> int: + parser = argparse.ArgumentParser(description="毕业设计 HMI 三场景说明与评测") + parser.add_argument( + "simulator_folder", + nargs="?", + default=None, + help="已构建的仿真目录(内含 config.yaml);若省略则只打印构建说明", + ) + parser.add_argument("--episodes", type=int, default=5, help="评测回合数") + parser.add_argument("--human", action="store_true", help="PyGame 实时窗口") + parser.add_argument("--record", action="store_true", help="录制 mp4") + args = parser.parse_args() + + if args.simulator_folder is None: + _print_build_help() + return 0 + + if not os.path.isdir(args.simulator_folder): + print(f"目录不存在: {args.simulator_folder}") + _print_build_help() + return 1 + + return _run_eval(os.path.abspath(args.simulator_folder), args.episodes, args.human, args.record) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/hci/user-in-the-box/uitb/scripts/hmi_quickstart.py b/hci/user-in-the-box/uitb/scripts/hmi_quickstart.py new file mode 100644 index 00000000..8fed61c8 --- /dev/null +++ b/hci/user-in-the-box/uitb/scripts/hmi_quickstart.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""HMI 统一启动器:三个场景对应三个辅助姿态遥控脚本。 + +1) 遥控驾驶 (arcade_drive_teleop.py) — WASD 推拉摇杆 +2) 控制精度 (pointing_teleop.py) — WASD 控制手指指向目标 +3) 小物体触碰 (touch_teleop.py) — WASD 控制手指触碰小目标 + +加 --eval 可改为自动评测路径(需要 checkpoint)。 + +仿真目录可通过环境变量覆盖: + HMI_SIM_REMOTE_DRIVING, HMI_SIM_PRECISION, HMI_SIM_TOUCH +""" +from __future__ import annotations + +import argparse +import os +import subprocess +import sys + + +def _root() -> str: + return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) + + +def _pick_sim(env_name: str, default_rel: str) -> str: + v = os.environ.get(env_name) + if v: + return os.path.abspath(v) + return os.path.join(_root(), default_rel.replace("/", os.sep)) + + +def main() -> int: + ap = argparse.ArgumentParser( + description="肌肉驱动手臂 HMI 系统 — 统一启动器" + ) + ap.add_argument( + "what", + nargs="?", + choices=("1", "2", "3", "arcade", "precision", "touch"), + default=None, + help="1=遥控驾驶 | 2=控制精度 | 3=小物体触碰", + ) + ap.add_argument("--eval", action="store_true", help="改为自动评测路径(需 checkpoint)") + ap.add_argument("--episodes", type=int, default=10, help="自动评测回合数") + ap.add_argument("--csv", type=str, default=None, help="导出指标 CSV 路径(人工操控模式)") + ap.set_defaults(cloned=True) + ap.add_argument("--uncloned", dest="cloned", action="store_false") + args = ap.parse_args() + + scripts = os.path.join(_root(), "uitb", "scripts") + + if args.what is None: + print("=" * 50) + print(" 肌肉驱动手臂 HMI 系统") + print("=" * 50) + print() + print("用法: python uitb/scripts/hmi_quickstart.py <1|2|3>") + print() + print(" 1 / arcade 场景1: 遥控驾驶(WASD 推拉摇杆)") + print(" 2 / precision 场景2: 控制精度(WASD 控制手指指向目标)") + print(" 3 / touch 场景3: 小物体触碰(WASD 控制手指触碰小目标)") + print() + print(" 加 --eval 改为自动评测(需 checkpoint)") + print(" 加 --csv metrics.csv 导出指标") + print() + print("默认仿真目录:") + print(f" 遥控驾驶: {_pick_sim('HMI_SIM_REMOTE_DRIVING', 'simulators/mobl_arms_index_remote_driving')}") + print(f" 控制精度: {_pick_sim('HMI_SIM_PRECISION', 'simulators/mobl_arms_index_pointing')}") + print(f" 小物体: {_pick_sim('HMI_SIM_TOUCH', 'simulators/grad_small_object_touch')}") + print(f"\n仓库根: {_root()}") + return 0 + + clone_flag = [] if args.cloned else ["--uncloned"] + csv_flag = ["--csv", args.csv] if args.csv else [] + key = args.what + + if key in ("1", "arcade"): + sim = _pick_sim("HMI_SIM_REMOTE_DRIVING", "simulators/mobl_arms_index_remote_driving") + if not os.path.isdir(sim): + print(f"目录不存在: {sim}", file=sys.stderr) + return 1 + if args.eval: + cmd = [sys.executable, "-m", "uitb.test.evaluator", sim, + "--metrics", "--deterministic", "--num_episodes", str(args.episodes)] + clone_flag + else: + cmd = [sys.executable, os.path.join(scripts, "arcade_drive_teleop.py"), sim] + clone_flag + csv_flag + print("启动:", " ".join(cmd)) + return subprocess.call(cmd, cwd=_root()) + + if key in ("2", "precision"): + sim = _pick_sim("HMI_SIM_PRECISION", "simulators/mobl_arms_index_pointing") + if not os.path.isdir(sim): + print(f"目录不存在: {sim}", file=sys.stderr) + return 1 + if args.eval: + cmd = [sys.executable, "-m", "uitb.test.evaluator", sim, + "--metrics", "--deterministic", "--num_episodes", str(args.episodes)] + clone_flag + else: + cmd = [sys.executable, os.path.join(scripts, "pointing_teleop.py"), sim] + clone_flag + csv_flag + print("启动:", " ".join(cmd)) + return subprocess.call(cmd, cwd=_root()) + + # touch + sim = _pick_sim("HMI_SIM_TOUCH", "simulators/grad_small_object_touch") + if not os.path.isdir(sim): + print(f"目录不存在: {sim}", file=sys.stderr) + return 1 + if args.eval: + cmd = [sys.executable, "-m", "uitb.test.evaluator", sim, + "--metrics", "--deterministic", "--num_episodes", str(args.episodes)] + clone_flag + else: + cmd = [sys.executable, os.path.join(scripts, "touch_teleop.py"), sim] + clone_flag + csv_flag + print("启动:", " ".join(cmd)) + return subprocess.call(cmd, cwd=_root()) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/hci/user-in-the-box/uitb/scripts/keyboard_teleop.py b/hci/user-in-the-box/uitb/scripts/keyboard_teleop.py new file mode 100644 index 00000000..6d40654c --- /dev/null +++ b/hci/user-in-the-box/uitb/scripts/keyboard_teleop.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""键盘实时操控仿真(肌肉控制量 [-1,1],与训练时 action_space 一致)。 + +操作(主键盘): + ← / → 切换当前驱动的执行器(肌肉)通道索引 + ↑ / ↓ 增大 / 减小当前通道的控制量(步长见 --step,按住 Shift 为大步长) + Home 将所有通道置零 + PageUp / PageDown 上一组 / 下一组:每次跳过 --group 个通道(便于高维快速浏览) + ESC 退出 + +说明:本环境默认不是「键盘直接对应关节」,而是对 PPO 使用的同一套 muscle/motor +动作向量做增量编辑;适合毕设演示人机对比或手动试控。 +若仿真为 RemoteDriving,可改用 arcade_drive_teleop.py(WASD 前推/左右分组)。 +""" +from __future__ import annotations + +import argparse +import os +import sys + +_REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if _REPO_ROOT not in sys.path: + sys.path.insert(0, _REPO_ROOT) + +import numpy as np +import pygame +import mujoco + +from uitb.simulator import Simulator + + +def _actuator_labels(simulator: Simulator) -> list[str]: + m = simulator._model + n = simulator.bm_model.nu + out = [] + for i in range(n): + name = mujoco.mj_id2name(m, mujoco.mjtObj.mjOBJ_ACTUATOR, i) + out.append(name if name else f"actuator_{i}") + return out + + +def _parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description="Keyboard teleop for uitb Simulator (muscle actions in [-1,1]).") + p.add_argument("simulator_folder", type=str, help="已构建的仿真目录(含 config.yaml)") + p.add_argument("--uncloned", dest="cloned", action="store_false", help="使用 uitb 源码而非克隆包") + p.add_argument("--step", type=float, default=0.04, help="↑/↓ 单步调整量(默认 0.04)") + p.add_argument("--step-fast", type=float, default=0.12, help="按住 Shift 时步长(默认 0.12)") + p.add_argument("--group", type=int, default=4, help="PageUp/PageDown 每次跳过的通道数(默认 4)") + return p.parse_args() + + +def main() -> int: + args = _parse_args() + if not os.path.isdir(args.simulator_folder): + print(f"目录不存在: {args.simulator_folder}", file=sys.stderr) + return 1 + + run_params = {"evaluate": True} + sim = Simulator.get( + args.simulator_folder, + render_mode="human", + render_mode_perception="embed", + run_parameters=run_params, + use_cloned=args.cloned, + ) + + if "llc" in sim.config: + print("当前仿真启用了 HRL/LLC,键盘脚本仅支持普通肌肉控制模式。", file=sys.stderr) + sim.close() + return 1 + + nu_total = int(sim.action_space.shape[0]) + nu_bm = int(sim.bm_model.nu) + if nu_bm < 1: + print("bm_model.nu < 1,无可键盘操控的执行器。", file=sys.stderr) + sim.close() + return 1 + nu_perc = nu_total - nu_bm + labels = _actuator_labels(sim) + + action = np.zeros(nu_total, dtype=np.float32) + ch = 0 + repeat_set = False + + print("键盘操控已启动,请看弹出的 PyGame 窗口标题与下方说明。") + print("←→ 换通道 ↑↓ 调值 Home 清零 PgUp/PgDn 跳组 Shift=大步长 ESC 退出") + if nu_perc > 0: + print(f"注意:另有 {nu_perc} 维感知执行器已置零(本脚本仅控制前 {nu_bm} 维肌肉)。") + + _, _ = sim.reset() + terminated = truncated = False + running = True + + try: + while running: + step_size = args.step_fast if (pygame.key.get_mods() & pygame.KMOD_SHIFT) else args.step + + for event in pygame.event.get(): + if event.type == pygame.QUIT: + running = False + continue + if event.type != pygame.KEYDOWN: + continue + k = event.key + if k == pygame.K_ESCAPE: + running = False + continue + if k == pygame.K_LEFT: + ch = (ch - 1) % nu_bm + elif k == pygame.K_RIGHT: + ch = (ch + 1) % nu_bm + elif k == pygame.K_UP: + action[ch] = float(np.clip(action[ch] + step_size, -1.0, 1.0)) + elif k == pygame.K_DOWN: + action[ch] = float(np.clip(action[ch] - step_size, -1.0, 1.0)) + elif k == pygame.K_HOME: + action[:] = 0.0 + elif k == pygame.K_PAGEUP: + ch = (ch - max(1, args.group)) % nu_bm + elif k == pygame.K_PAGEDOWN: + ch = (ch + max(1, args.group)) % nu_bm + + _obs, reward, terminated, truncated, info = sim.step(action) + + if not repeat_set and sim._render_window is not None: + pygame.key.set_repeat(280, 45) + repeat_set = True + + cap_ch = max(nu_bm - 1, 0) + caption = ( + f"Teleop | {ch}/{cap_ch} {labels[ch][:26]} | " + f"u={action[ch]:+.2f} | r={reward:.3f} | ESC quit" + ) + if sim._render_window is not None: + pygame.display.set_caption(caption) + + if terminated or truncated: + _, _ = sim.reset() + terminated = truncated = False + finally: + sim.close() + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/hci/user-in-the-box/uitb/scripts/pointing_teleop.py b/hci/user-in-the-box/uitb/scripts/pointing_teleop.py new file mode 100644 index 00000000..ff2c073e --- /dev/null +++ b/hci/user-in-the-box/uitb/scripts/pointing_teleop.py @@ -0,0 +1,372 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""控制精度测试:辅助姿态 + 方向键操控手指指向目标。 + +适用于 Pointing / SmallObjectTouch 任务。手臂从一个参考姿态出发, +方向键(或 WASD)在目标平面的 Y/Z 方向持续平移,松手后保持当前位置。 + +按键: + W / ↑ 手指向上 + S / ↓ 手指向下 + A / ← 手指向左 + D / → 手指向右 + Home 偏置清零(回到参考姿态) + R 重新开局 + ESC 退出 +""" +from __future__ import annotations + +import argparse +import ctypes +import os +import sys + +_REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if _REPO_ROOT not in sys.path: + sys.path.insert(0, _REPO_ROOT) + +import mujoco +import numpy as np +import pygame + +from uitb.simulator import Simulator +from uitb.utils import interaction_metrics + +_REFERENCE_ARM_QPOS = np.array( + [1.4777, 0.8877, -1.1609, 1.2665, -1.0318], + dtype=np.float32, +) + +_Y_VECTOR = np.array([0.00, 0.00, 0.18, 0.00, 0.03], dtype=np.float32) +_Z_VECTOR = np.array([0.10, 0.00, 0.00, 0.05, 0.00], dtype=np.float32) + +_OFFSET_LIMIT = 2.3 + +_IS_WINDOWS = os.name == "nt" +_USER32 = ctypes.windll.user32 if _IS_WINDOWS else None + +_VK_W = 0x57 +_VK_A = 0x41 +_VK_S = 0x53 +_VK_D = 0x44 +_VK_UP = 0x26 +_VK_DOWN = 0x28 +_VK_LEFT = 0x25 +_VK_RIGHT = 0x27 +_VK_HOME = 0x24 +_VK_R = 0x52 +_VK_ESC = 0x1B +_VK_SHIFT = 0x10 + + +def _joint_ranges(sim: Simulator) -> tuple[np.ndarray, np.ndarray]: + jidx = np.asarray(sim.bm_model._independent_joints, dtype=np.int32) + rng = sim._model.jnt_range[jidx].astype(np.float32) + return rng[:, 0], rng[:, 1] + + +def _apply_pose(sim: Simulator, arm_qpos: np.ndarray) -> None: + idx_q = sim.bm_model._independent_qpos + idx_d = sim.bm_model._independent_dofs + muscles = sim.bm_model._muscle_actuators + sim._data.qpos[idx_q] = arm_qpos + sim._data.qvel[idx_d] = 0.0 + sim._data.act[muscles] = 0.0 + mujoco.mj_forward(sim._model, sim._data) + + +def _reset_to_ref(sim: Simulator, ref: np.ndarray) -> None: + _, _ = sim.reset() + sim._data.qvel[:] = 0.0 + if sim._data.act is not None and sim._data.act.size > 0: + sim._data.act[:] = 0.0 + _apply_pose(sim, ref) + + +def _draw_hud(surface: pygame.Surface, lines: list[str]) -> None: + try: + font = pygame.font.SysFont("consolas", 16) + except Exception: + font = pygame.font.Font(None, 18) + y = 6 + for txt in lines: + rendered = font.render(txt, True, (255, 255, 255)) + bg = pygame.Surface((rendered.get_width() + 6, rendered.get_height() + 2), pygame.SRCALPHA) + bg.fill((0, 0, 0, 140)) + surface.blit(bg, (4, y)) + surface.blit(rendered, (7, y + 1)) + y += rendered.get_height() + 3 + + +def _win_key_pressed(vk_code: int) -> bool: + if not _IS_WINDOWS or _USER32 is None: + return False + return bool(_USER32.GetAsyncKeyState(vk_code) & 0x8000) + + +def _parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description="Pointing / Touch precision teleop with assisted pose.") + p.add_argument("simulator_folder", type=str) + p.add_argument("--uncloned", dest="cloned", action="store_false") + p.add_argument("--step", type=float, default=0.05, help="每帧偏置增量") + p.add_argument("--return-rate", type=float, default=0.0, help="松手时偏置自动回中比例;0 表示保持当前位置") + p.add_argument("--csv", type=str, default=None, help="导出指标 CSV 路径") + p.add_argument( + "--layout", choices=("wasd", "arrows"), default="wasd", help="wasd 或 方向键", + ) + return p.parse_args() + + +def main() -> int: + args = _parse_args() + if not os.path.isdir(args.simulator_folder): + print(f"目录不存在: {args.simulator_folder}", file=sys.stderr) + return 1 + + sim = Simulator.get( + args.simulator_folder, + render_mode="human", + render_mode_perception="embed", + run_parameters={"evaluate": True}, + use_cloned=args.cloned, + ) + + task_name = sim.task.__class__.__name__ + if task_name not in ("Pointing", "SmallObjectTouch"): + print(f"本脚本仅用于 Pointing / SmallObjectTouch 任务。当前: {task_name}", file=sys.stderr) + sim.close() + return 1 + + nu_total = int(sim.action_space.shape[0]) + if len(sim.bm_model._independent_qpos) != len(_REFERENCE_ARM_QPOS): + print("独立关节数量与参考姿态不匹配。", file=sys.stderr) + sim.close() + return 1 + + arm_lo, arm_hi = _joint_ranges(sim) + arm_ref = np.clip(_REFERENCE_ARM_QPOS.copy(), arm_lo, arm_hi) + + if args.layout == "wasd": + K_U, K_D, K_L, K_R = pygame.K_w, pygame.K_s, pygame.K_a, pygame.K_d + else: + K_U, K_D, K_L, K_R = pygame.K_UP, pygame.K_DOWN, pygame.K_LEFT, pygame.K_RIGHT + + print(f"精度测试遥控(辅助姿态):{args.layout.upper()} 控制手指 Shift 加速 R 重置 Home 回中 ESC 退出") + print("请先点击弹出的仿真窗口使其获得焦点。") + + _reset_to_ref(sim, arm_ref) + if sim._render_window is not None: + try: + pygame.key.set_repeat(0, 0) + except pygame.error: + pass + + action = np.zeros(nu_total, dtype=np.float32) + off_y = 0.0 + off_z = 0.0 + running = True + repeat_set = False + key_state = {"up": False, "down": False, "left": False, "right": False, "home": False, "reset": False, "esc": False} + last_key_event = "-" + + ep_dists: list[float] = [] + ep_insides: list[bool] = [] + all_episodes: list[dict] = [] + targets_hit = 0 + episode_idx = 0 + + try: + while running: + pygame.event.pump() + for event in pygame.event.get(): + if event.type == pygame.QUIT: + running = False + elif not _IS_WINDOWS and event.type == pygame.KEYDOWN: + if event.key in (K_U,): + key_state["up"] = True + last_key_event = "KEYDOWN:UP" + elif event.key in (K_D,): + key_state["down"] = True + last_key_event = "KEYDOWN:DOWN" + elif event.key in (K_L,): + key_state["left"] = True + last_key_event = "KEYDOWN:LEFT" + elif event.key in (K_R,): + key_state["right"] = True + last_key_event = "KEYDOWN:RIGHT" + elif event.key == pygame.K_HOME: + key_state["home"] = True + last_key_event = "KEYDOWN:HOME" + elif event.key == pygame.K_r: + key_state["reset"] = True + last_key_event = "KEYDOWN:RESET" + elif event.key == pygame.K_ESCAPE: + key_state["esc"] = True + last_key_event = "KEYDOWN:ESC" + elif not _IS_WINDOWS and event.type == pygame.KEYUP: + if event.key in (K_U,): + key_state["up"] = False + last_key_event = "KEYUP:UP" + elif event.key in (K_D,): + key_state["down"] = False + last_key_event = "KEYUP:DOWN" + elif event.key in (K_L,): + key_state["left"] = False + last_key_event = "KEYUP:LEFT" + elif event.key in (K_R,): + key_state["right"] = False + last_key_event = "KEYUP:RIGHT" + elif event.key == pygame.K_HOME: + key_state["home"] = False + last_key_event = "KEYUP:HOME" + elif event.key == pygame.K_r: + key_state["reset"] = False + last_key_event = "KEYUP:RESET" + elif event.key == pygame.K_ESCAPE: + key_state["esc"] = False + last_key_event = "KEYUP:ESC" + + if not repeat_set and sim._render_window is not None: + try: + pygame.key.set_repeat(180, 35) + except pygame.error: + pass + repeat_set = True + + if _IS_WINDOWS: + up_pressed = _win_key_pressed(_VK_W) or _win_key_pressed(_VK_UP) + down_pressed = _win_key_pressed(_VK_S) or _win_key_pressed(_VK_DOWN) + left_pressed = _win_key_pressed(_VK_A) or _win_key_pressed(_VK_LEFT) + right_pressed = _win_key_pressed(_VK_D) or _win_key_pressed(_VK_RIGHT) + home_pressed = _win_key_pressed(_VK_HOME) + reset_pressed = _win_key_pressed(_VK_R) + esc_pressed = _win_key_pressed(_VK_ESC) + shift_pressed = _win_key_pressed(_VK_SHIFT) + else: + up_pressed = key_state["up"] + down_pressed = key_state["down"] + left_pressed = key_state["left"] + right_pressed = key_state["right"] + home_pressed = key_state["home"] + reset_pressed = key_state["reset"] + esc_pressed = key_state["esc"] + shift_pressed = bool(pygame.key.get_mods() & pygame.KMOD_SHIFT) + + mul = 2.2 if shift_pressed else 1.0 + step = args.step * mul + + if esc_pressed: + running = False + continue + + if reset_pressed: + if ep_dists: + all_episodes.append(interaction_metrics.collect_episode_metrics( + task_name, ep_dists, ep_insides, targets_hit)) + ep_dists, ep_insides, targets_hit = [], [], 0 + off_y, off_z = 0.0, 0.0 + _reset_to_ref(sim, arm_ref) + episode_idx += 1 + if not _IS_WINDOWS: + key_state["reset"] = False + continue + + if home_pressed: + off_y, off_z = 0.0, 0.0 + else: + if left_pressed: + off_y -= step + if right_pressed: + off_y += step + if args.return_rate > 0.0 and not (left_pressed or right_pressed): + off_y *= max(0.0, 1.0 - args.return_rate) + if up_pressed: + off_z += step + if down_pressed: + off_z -= step + if args.return_rate > 0.0 and not (up_pressed or down_pressed): + off_z *= max(0.0, 1.0 - args.return_rate) + + off_y = float(np.clip(off_y, -_OFFSET_LIMIT, _OFFSET_LIMIT)) + off_z = float(np.clip(off_z, -_OFFSET_LIMIT, _OFFSET_LIMIT)) + if abs(off_y) < 1e-4: + off_y = 0.0 + if abs(off_z) < 1e-4: + off_z = 0.0 + + arm_q = arm_ref + off_y * _Y_VECTOR + off_z * _Z_VECTOR + arm_q = np.clip(arm_q, arm_lo, arm_hi) + _apply_pose(sim, arm_q) + + _obs, reward, terminated, truncated, _info = sim.step(action) + + state = sim.get_state() + dist = float(state.get("dist_to_target_center", float("nan"))) + inside = bool(state.get("inside_target", False)) + hit = bool(state.get("target_hit", False)) + t_hit = int(state.get("targets_hit", 0)) + t_radius = float(state.get("target_radius", 0.05)) + + if dist == dist: + ep_dists.append(dist) + ep_insides.append(inside) + if hit: + targets_hit = t_hit + + focus_ok = bool(pygame.key.get_focused()) + keys_now: list[str] = [] + if up_pressed: + keys_now.append("UP") + if down_pressed: + keys_now.append("DOWN") + if left_pressed: + keys_now.append("LEFT") + if right_pressed: + keys_now.append("RIGHT") + if home_pressed: + keys_now.append("HOME") + if reset_pressed: + keys_now.append("RESET") + if esc_pressed: + keys_now.append("ESC") + key_text = ",".join(keys_now) if keys_now else "-" + last_key_event = key_text + + hud = [ + f"Ep {episode_idx} | dist={dist:.4f} radius={t_radius:.3f}", + f"inside={inside} hits={targets_hit} r={reward:.3f}", + f"pos_y={off_y:+.2f} pos_z={off_z:+.2f} hold={'ON' if args.return_rate <= 0.0 else 'OFF'}", + f"focus={'YES' if focus_ok else 'NO '} keys={key_text}", + f"last={last_key_event}", + f"{args.layout.upper()} move | Shift fast | Home center | R new | ESC quit", + ] + if ep_dists: + stats = interaction_metrics.scalar_stats(ep_dists) + hud.append(f"RMSE={stats['rmse']:.4f} MAE={stats['mae']:.4f}") + + if sim._render_window is not None: + _draw_hud(pygame.display.get_surface(), hud) + pygame.display.flip() + + if terminated or truncated: + if ep_dists: + all_episodes.append(interaction_metrics.collect_episode_metrics( + task_name, ep_dists, ep_insides, targets_hit)) + ep_dists, ep_insides, targets_hit = [], [], 0 + off_y, off_z = 0.0, 0.0 + _reset_to_ref(sim, arm_ref) + episode_idx += 1 + + finally: + if ep_dists: + all_episodes.append(interaction_metrics.collect_episode_metrics( + task_name, ep_dists, ep_insides, targets_hit)) + if all_episodes: + interaction_metrics.print_run_summary(task_name, all_episodes, csv_path=args.csv) + sim.close() + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/hci/user-in-the-box/uitb/scripts/touch_teleop.py b/hci/user-in-the-box/uitb/scripts/touch_teleop.py new file mode 100644 index 00000000..6973d245 --- /dev/null +++ b/hci/user-in-the-box/uitb/scripts/touch_teleop.py @@ -0,0 +1,378 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""小物体触碰:辅助姿态 + 方向键操控手指触碰小目标。 + +适用于 SmallObjectTouch 任务。手臂从参考姿态出发,方向键控制手指在目标平面内持续平移, +松手后保持当前位置。 + +按键: + W / ↑ 手指向上 + S / ↓ 手指向下 + A / ← 手指向左 + D / → 手指向右 + Home 偏置清零 + R 重新开局 + ESC 退出 +""" +from __future__ import annotations + +import argparse +import ctypes +import os +import sys + +_REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if _REPO_ROOT not in sys.path: + sys.path.insert(0, _REPO_ROOT) + +import mujoco +import numpy as np +import pygame + +from uitb.simulator import Simulator +from uitb.utils import interaction_metrics + +_REFERENCE_ARM_QPOS = np.array( + [1.4777, 0.8877, -1.1609, 1.2665, -1.0318], + dtype=np.float32, +) + +_Y_VECTOR = np.array([0.00, 0.00, 0.18, 0.00, 0.03], dtype=np.float32) +_Z_VECTOR = np.array([0.10, 0.00, 0.00, 0.05, 0.00], dtype=np.float32) + +_OFFSET_LIMIT = 2.3 + +_IS_WINDOWS = os.name == "nt" +_USER32 = ctypes.windll.user32 if _IS_WINDOWS else None + +_VK_W = 0x57 +_VK_A = 0x41 +_VK_S = 0x53 +_VK_D = 0x44 +_VK_UP = 0x26 +_VK_DOWN = 0x28 +_VK_LEFT = 0x25 +_VK_RIGHT = 0x27 +_VK_HOME = 0x24 +_VK_R = 0x52 +_VK_ESC = 0x1B +_VK_SHIFT = 0x10 + + +def _joint_ranges(sim: Simulator) -> tuple[np.ndarray, np.ndarray]: + jidx = np.asarray(sim.bm_model._independent_joints, dtype=np.int32) + rng = sim._model.jnt_range[jidx].astype(np.float32) + return rng[:, 0], rng[:, 1] + + +def _apply_pose(sim: Simulator, arm_qpos: np.ndarray) -> None: + idx_q = sim.bm_model._independent_qpos + idx_d = sim.bm_model._independent_dofs + muscles = sim.bm_model._muscle_actuators + sim._data.qpos[idx_q] = arm_qpos + sim._data.qvel[idx_d] = 0.0 + sim._data.act[muscles] = 0.0 + mujoco.mj_forward(sim._model, sim._data) + + +def _reset_to_ref(sim: Simulator, ref: np.ndarray) -> None: + _, _ = sim.reset() + sim._data.qvel[:] = 0.0 + if sim._data.act is not None and sim._data.act.size > 0: + sim._data.act[:] = 0.0 + _apply_pose(sim, ref) + + +def _draw_hud(surface: pygame.Surface, lines: list[str], color_hint: tuple = (255, 255, 255)) -> None: + try: + font = pygame.font.SysFont("consolas", 16) + except Exception: + font = pygame.font.Font(None, 18) + y = 6 + for txt in lines: + rendered = font.render(txt, True, color_hint) + bg = pygame.Surface((rendered.get_width() + 6, rendered.get_height() + 2), pygame.SRCALPHA) + bg.fill((0, 0, 0, 140)) + surface.blit(bg, (4, y)) + surface.blit(rendered, (7, y + 1)) + y += rendered.get_height() + 3 + + +def _win_key_pressed(vk_code: int) -> bool: + if not _IS_WINDOWS or _USER32 is None: + return False + return bool(_USER32.GetAsyncKeyState(vk_code) & 0x8000) + + +def _parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description="SmallObjectTouch teleop with assisted pose.") + p.add_argument("simulator_folder", type=str) + p.add_argument("--uncloned", dest="cloned", action="store_false") + p.add_argument("--step", type=float, default=0.05) + p.add_argument("--return-rate", type=float, default=0.0) + p.add_argument("--csv", type=str, default=None) + p.add_argument("--layout", choices=("wasd", "arrows"), default="wasd") + return p.parse_args() + + +def main() -> int: + args = _parse_args() + if not os.path.isdir(args.simulator_folder): + print(f"目录不存在: {args.simulator_folder}", file=sys.stderr) + return 1 + + sim = Simulator.get( + args.simulator_folder, + render_mode="human", + render_mode_perception="embed", + run_parameters={"evaluate": True}, + use_cloned=args.cloned, + ) + + task_name = sim.task.__class__.__name__ + if task_name != "SmallObjectTouch": + print(f"本脚本仅用于 SmallObjectTouch 任务。当前: {task_name}", file=sys.stderr) + sim.close() + return 1 + nu_total = int(sim.action_space.shape[0]) + if len(sim.bm_model._independent_qpos) != len(_REFERENCE_ARM_QPOS): + print("独立关节数量与参考姿态不匹配。", file=sys.stderr) + sim.close() + return 1 + + arm_lo, arm_hi = _joint_ranges(sim) + arm_ref = np.clip(_REFERENCE_ARM_QPOS.copy(), arm_lo, arm_hi) + + if args.layout == "wasd": + K_U, K_D, K_L, K_R = pygame.K_w, pygame.K_s, pygame.K_a, pygame.K_d + else: + K_U, K_D, K_L, K_R = pygame.K_UP, pygame.K_DOWN, pygame.K_LEFT, pygame.K_RIGHT + + print(f"小物体触碰遥控(辅助姿态):{args.layout.upper()} 控制手指 Shift 加速 R 重置 Home 回中 ESC 退出") + print("请先点击弹出的仿真窗口使其获得焦点。") + + _reset_to_ref(sim, arm_ref) + if sim._render_window is not None: + try: + pygame.key.set_repeat(0, 0) + except pygame.error: + pass + + action = np.zeros(nu_total, dtype=np.float32) + off_y = 0.0 + off_z = 0.0 + running = True + repeat_set = False + key_state = {"up": False, "down": False, "left": False, "right": False, "home": False, "reset": False, "esc": False} + last_key_event = "-" + + ep_dists: list[float] = [] + ep_insides: list[bool] = [] + all_episodes: list[dict] = [] + targets_hit = 0 + episode_idx = 0 + flash_timer = 0 + + try: + while running: + pygame.event.pump() + for event in pygame.event.get(): + if event.type == pygame.QUIT: + running = False + elif not _IS_WINDOWS and event.type == pygame.KEYDOWN: + if event.key in (K_U,): + key_state["up"] = True + last_key_event = "KEYDOWN:UP" + elif event.key in (K_D,): + key_state["down"] = True + last_key_event = "KEYDOWN:DOWN" + elif event.key in (K_L,): + key_state["left"] = True + last_key_event = "KEYDOWN:LEFT" + elif event.key in (K_R,): + key_state["right"] = True + last_key_event = "KEYDOWN:RIGHT" + elif event.key == pygame.K_HOME: + key_state["home"] = True + last_key_event = "KEYDOWN:HOME" + elif event.key == pygame.K_r: + key_state["reset"] = True + last_key_event = "KEYDOWN:RESET" + elif event.key == pygame.K_ESCAPE: + key_state["esc"] = True + last_key_event = "KEYDOWN:ESC" + elif not _IS_WINDOWS and event.type == pygame.KEYUP: + if event.key in (K_U,): + key_state["up"] = False + last_key_event = "KEYUP:UP" + elif event.key in (K_D,): + key_state["down"] = False + last_key_event = "KEYUP:DOWN" + elif event.key in (K_L,): + key_state["left"] = False + last_key_event = "KEYUP:LEFT" + elif event.key in (K_R,): + key_state["right"] = False + last_key_event = "KEYUP:RIGHT" + elif event.key == pygame.K_HOME: + key_state["home"] = False + last_key_event = "KEYUP:HOME" + elif event.key == pygame.K_r: + key_state["reset"] = False + last_key_event = "KEYUP:RESET" + elif event.key == pygame.K_ESCAPE: + key_state["esc"] = False + last_key_event = "KEYUP:ESC" + + if not repeat_set and sim._render_window is not None: + try: + pygame.key.set_repeat(180, 35) + except pygame.error: + pass + repeat_set = True + + if _IS_WINDOWS: + up_pressed = _win_key_pressed(_VK_W) or _win_key_pressed(_VK_UP) + down_pressed = _win_key_pressed(_VK_S) or _win_key_pressed(_VK_DOWN) + left_pressed = _win_key_pressed(_VK_A) or _win_key_pressed(_VK_LEFT) + right_pressed = _win_key_pressed(_VK_D) or _win_key_pressed(_VK_RIGHT) + home_pressed = _win_key_pressed(_VK_HOME) + reset_pressed = _win_key_pressed(_VK_R) + esc_pressed = _win_key_pressed(_VK_ESC) + shift_pressed = _win_key_pressed(_VK_SHIFT) + else: + up_pressed = key_state["up"] + down_pressed = key_state["down"] + left_pressed = key_state["left"] + right_pressed = key_state["right"] + home_pressed = key_state["home"] + reset_pressed = key_state["reset"] + esc_pressed = key_state["esc"] + shift_pressed = bool(pygame.key.get_mods() & pygame.KMOD_SHIFT) + + mul = 2.2 if shift_pressed else 1.0 + step = args.step * mul + + if esc_pressed: + running = False + continue + + if reset_pressed: + if ep_dists: + all_episodes.append(interaction_metrics.collect_episode_metrics( + task_name, ep_dists, ep_insides, targets_hit)) + ep_dists, ep_insides, targets_hit = [], [], 0 + off_y, off_z = 0.0, 0.0 + _reset_to_ref(sim, arm_ref) + episode_idx += 1 + if not _IS_WINDOWS: + key_state["reset"] = False + continue + + if home_pressed: + off_y, off_z = 0.0, 0.0 + else: + if left_pressed: + off_y -= step + if right_pressed: + off_y += step + if args.return_rate > 0.0 and not (left_pressed or right_pressed): + off_y *= max(0.0, 1.0 - args.return_rate) + if up_pressed: + off_z += step + if down_pressed: + off_z -= step + if args.return_rate > 0.0 and not (up_pressed or down_pressed): + off_z *= max(0.0, 1.0 - args.return_rate) + + off_y = float(np.clip(off_y, -_OFFSET_LIMIT, _OFFSET_LIMIT)) + off_z = float(np.clip(off_z, -_OFFSET_LIMIT, _OFFSET_LIMIT)) + if abs(off_y) < 1e-4: + off_y = 0.0 + if abs(off_z) < 1e-4: + off_z = 0.0 + + arm_q = arm_ref + off_y * _Y_VECTOR + off_z * _Z_VECTOR + arm_q = np.clip(arm_q, arm_lo, arm_hi) + _apply_pose(sim, arm_q) + + _obs, reward, terminated, truncated, _info = sim.step(action) + + state = sim.get_state() + dist = float(state.get("dist_to_target_center", float("nan"))) + inside = bool(state.get("inside_target", False)) + hit = bool(state.get("target_hit", False)) + t_hit = int(state.get("targets_hit", 0)) + t_radius = float(state.get("target_radius", 0.03)) + contact = bool(state.get("object_contact", False)) + + if dist == dist: + ep_dists.append(dist) + ep_insides.append(inside) + if hit: + targets_hit = t_hit + flash_timer = 15 + + if flash_timer > 0: + flash_timer -= 1 + hud_color = (0, 255, 80) if flash_timer > 0 else (255, 255, 255) + + focus_ok = bool(pygame.key.get_focused()) + keys_now: list[str] = [] + if up_pressed: + keys_now.append("UP") + if down_pressed: + keys_now.append("DOWN") + if left_pressed: + keys_now.append("LEFT") + if right_pressed: + keys_now.append("RIGHT") + if home_pressed: + keys_now.append("HOME") + if reset_pressed: + keys_now.append("RESET") + if esc_pressed: + keys_now.append("ESC") + key_text = ",".join(keys_now) if keys_now else "-" + last_key_event = key_text + + hud = [ + f"Ep {episode_idx} | dist={dist:.4f} radius={t_radius:.3f}", + f"inside={inside} contact={contact} hits={targets_hit}", + f"pos_y={off_y:+.2f} pos_z={off_z:+.2f} r={reward:.3f}", + f"hold={'ON' if args.return_rate <= 0.0 else 'OFF'} focus={'YES' if focus_ok else 'NO '} keys={key_text}", + f"last={last_key_event}", + f"{args.layout.upper()} move | Shift fast | Home center | R new | ESC quit", + ] + if ep_dists: + stats = interaction_metrics.scalar_stats(ep_dists) + hud.append(f"RMSE={stats['rmse']:.4f} MAE={stats['mae']:.4f}") + if flash_timer > 0: + hud.insert(0, "*** TOUCH! ***") + + if sim._render_window is not None: + _draw_hud(pygame.display.get_surface(), hud, hud_color) + pygame.display.flip() + + if terminated or truncated: + if ep_dists: + all_episodes.append(interaction_metrics.collect_episode_metrics( + task_name, ep_dists, ep_insides, targets_hit)) + ep_dists, ep_insides, targets_hit = [], [], 0 + off_y, off_z = 0.0, 0.0 + _reset_to_ref(sim, arm_ref) + episode_idx += 1 + + finally: + if ep_dists: + all_episodes.append(interaction_metrics.collect_episode_metrics( + task_name, ep_dists, ep_insides, targets_hit)) + if all_episodes: + interaction_metrics.print_run_summary(task_name, all_episodes, csv_path=args.csv) + sim.close() + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())