diff --git a/docs/src/evaluation.md b/docs/src/evaluation.md index 0b73228aa5..00a23ed1a2 100644 --- a/docs/src/evaluation.md +++ b/docs/src/evaluation.md @@ -2,7 +2,77 @@ Driving is a safety-critical multi-agent application, making careful evaluation and risk assessment essential. Mistakes in the real world are costly, so simulations are used to catch errors before deployment. To support rapid iteration, evaluations should ideally run efficiently. This is why we also paid attention to optimizing the speed of the evaluations. This page contains an overview of the available benchmarks and evals. -## Sanity maps 🐛 +## Evaluation during training + +PufferDrive supports running evaluations automatically during training. There are four evaluation types that can run periodically: + +| Eval type | What it does | CLI flag to enable | Interval flag | +|---|---|---|---| +| **Render** | Records top-down and agent-view videos | `--train.render True` | `--train.render-interval N` | +| **Safe eval render** | Records videos with safe reward conditioning | `--safe-eval.enabled True` | `--safe-eval.interval N` | +| **Safe eval metrics** | Runs policy in subprocess, logs driving metrics | `--safe-eval.enabled True` | `--safe-eval.interval N` | +| **WOSAC realism** | Measures distributional realism (WOSAC benchmark) | `--eval.wosac-realism-eval True` | `--eval.eval-interval N` | +| **Human replay render** | Records videos with policy-controlled SDC + replayed humans | `--eval.human-replay-eval True` | `--eval.eval-interval N` | +| **Human replay metrics** | Logs collision/offroad/completion rates vs human replays | `--eval.human-replay-eval True` | `--eval.eval-interval N` | + +All eval types trigger at `epoch % interval == 0`. They require a saved checkpoint, so **`checkpoint-interval` must be <= the smallest eval interval**. + +### Example: enable all evals + +```bash +puffer train puffer_drive \ + --wandb --wandb-project pufferdrive \ + --train.checkpoint-interval 250 \ + --train.render True --train.render-interval 250 \ + --safe-eval.enabled True --safe-eval.interval 250 \ + --eval.wosac-realism-eval True \ + --eval.human-replay-eval True \ + --eval.eval-interval 250 +``` + +### Safe eval + +Safe eval measures how well the policy drives when given "safe" reward conditioning values (high penalties for collisions and offroad driving, rewards for lane keeping). It runs in a **separate subprocess** that loads the latest checkpoint, creates a fresh environment, and collects metrics over multiple episodes. + +The safe eval subprocess inherits the training environment configuration (map directory, reward bounds, etc.) but overrides a few parameters: + +- `num_agents`: Number of agents in the eval environment (default: 64) +- `episode_length`: How long each eval episode runs (default: 1000 steps) +- `num_episodes`: How many episode completions to collect before reporting (default: 100) +- `resample_frequency`: Automatically set to 0 (disabled) so episodes can run to completion + +Metrics logged to wandb under `eval/*`: + +- `eval/score`, `eval/collision_rate`, `eval/offroad_rate` +- `eval/completion_rate`, `eval/dnf_rate` +- `eval/episode_length`, `eval/episode_return` +- `eval/lane_alignment_rate`, `eval/lane_center_rate` +- And more (see `drive.h` `Log` struct for the full list) + +Configure safe eval reward conditioning in `drive.ini` under `[safe_eval]`: + +```ini +[safe_eval] +enabled = True +interval = 250 +num_agents = 64 +num_episodes = 100 +episode_length = 1000 + +; Fixed reward conditioning values (min=max pins the value) +collision = -3.0 +offroad = -3.0 +overspeed = -1.0 +traffic_light = -1.0 +lane_align = 0.025 +velocity = 0.005 +``` + +### Async vs sync evaluation + +By default, all evals run synchronously (blocking training until they finish). Set `--train.render-async True` to run video renders in separate processes, and `--eval.eval-async True` to run metric evals (safe eval, WOSAC, human replay) in background threads. When async, results are queued and logged to wandb on the main thread during the next training epoch. + +## Sanity maps Quickly test the training on curated, lightweight scenarios without downloading the full dataset. Each sanity map tests a specific behavior. @@ -33,7 +103,7 @@ Available maps: ![Sanity map gallery placeholder](images/maps_screenshot.png) -## Distributional realism benchmark 📊 +## Distributional realism benchmark (WOSAC) We provide a PufferDrive implementation of the Waymo Open Sim Agents Challenge (WOSAC) for fast, easy evaluation of how well your trained agent matches distributional properties of human behavior. @@ -45,7 +115,7 @@ Add `--load-model-path .pt` to score a trained policy, inste See [the WOSAC benchmark page](wosac.md) for the metric pipeline and all the details. -## Human-compatibility benchmark 🤝 +## Human-compatibility benchmark You may be interested in how compatible your agent is with human partners. For this purpose, we support an eval where your policy only controls the self-driving car (SDC). The rest of the agents in the scene are stepped using the logs. While it is not a perfect eval since the human partners here are static, it will still give you a sense of how closely aligned your agent's behavior is to how people drive. You can run it like this: diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index b7a2e4f209..f3e093a5c3 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -158,7 +158,7 @@ vtrace_rho_clip = 1 checkpoint_interval = 1000 ; Rendering options render = True -render_async = False # Render interval of below 50 might cause process starvation and slowness in training +render_async = True render_interval = 1000 ; If True, show exactly what the agent sees in agent observation obs_only = True @@ -175,6 +175,8 @@ render_map = none [eval] eval_interval = 1000 +; If True, run eval subprocesses (wosac, human replay, safe eval metrics) in background threads +eval_async = False # Run eval subprocesses (wosac, human replay, safe eval metrics) in background threads ; Path to dataset used for evaluation map_dir = "resources/drive/binaries/training" ; Number of scenarios to process per batch @@ -204,14 +206,54 @@ wosac_goal_radius = 2.0 wosac_sanity_check = False ; Only return aggregate results across all scenes wosac_aggregate_results = True +; Episode length for WOSAC eval (ground truth logs are 9.1s at 10Hz = 91 steps) +wosac_episode_length = 91 ; Evaluation mode: "policy", "ground_truth" wosac_eval_mode = "policy" ; If True, enable human replay evaluation (pair policy-controlled agent with human replays) -human_replay_eval = False +human_replay_eval = True +; Number of agents for human replay evaluation +human_replay_num_agents = 16 ; Control only the self-driving car human_replay_control_mode = "control_sdc_only" -; Number of scenarios for human replay evaluation equals the number of agents -human_replay_num_agents = 16 + +[safe_eval] +; If True, periodically run policy with safe/law-abiding reward conditioning and log videos + metrics +enabled = True +; How often to run safe eval (in training epochs). Defaults to render_interval. +interval = 250 +; Number of agents to run in the eval environment +num_agents = 64 +; Number of episodes to collect metrics over +num_episodes = 100 +; episode length +episode_length = 1000 +min_goal_distance = 0.5 +max_goal_distance = 1000.0 + +; Reward conditioning values (min=max to fix the value). +; Names match the env reward_bound_* keys. +; High penalties for unsafe behavior +collision = -3.0 +offroad = -3.0 +overspeed = -1.0 +traffic_light = -1.0 +reverse = -0.0075 +comfort = -0.1 + +; Standard driving rewards +goal_radius = 2.0 +lane_align = 0.025 +lane_center = -0.00075 +velocity = 0.005 +center_bias = 0.0 +vel_align = 1.0 +timestep = -0.00005 + +; Neutral scaling factors +throttle = 1.0 +steer = 1.0 +acc = 1.0 [render] ; Mode to render a bunch of maps with a given policy diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index 729a112459..8990d8e770 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -245,6 +245,7 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { free(env->agents); free(env->road_elements); free(env->road_scenario_ids); + free(env->tracks_to_predict_indices); free(env->active_agent_indices); free(env->static_agent_indices); free(env->expert_static_agent_indices); @@ -257,14 +258,6 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { return NULL; } - // Store map_id - PyObject *map_id_obj = PyLong_FromLong(map_id); - PyList_SetItem(map_ids, env_count, map_id_obj); - // Store agent offset - PyObject *offset = PyLong_FromLong(total_agent_count); - PyList_SetItem(agent_offsets, env_count, offset); - total_agent_count += env->active_agent_count; - env_count++; for (int j = 0; j < env->num_objects; j++) { free_agent(&env->agents[j]); } @@ -274,12 +267,36 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { free(env->agents); free(env->road_elements); free(env->road_scenario_ids); + free(env->tracks_to_predict_indices); free(env->active_agent_indices); free(env->static_agent_indices); free(env->expert_static_agent_indices); free(env); continue; } + + // Map has active agents — record it + PyObject *map_id_obj = PyLong_FromLong(map_id); + PyList_SetItem(map_ids, env_count, map_id_obj); + PyObject *offset = PyLong_FromLong(total_agent_count); + PyList_SetItem(agent_offsets, env_count, offset); + total_agent_count += env->active_agent_count; + env_count++; + + for (int j = 0; j < env->num_objects; j++) { + free_agent(&env->agents[j]); + } + for (int j = 0; j < env->num_roads; j++) { + free_road_element(&env->road_elements[j]); + } + free(env->agents); + free(env->road_elements); + free(env->road_scenario_ids); + free(env->tracks_to_predict_indices); + free(env->active_agent_indices); + free(env->static_agent_indices); + free(env->expert_static_agent_indices); + free(env); } if (total_agent_count >= num_agents) { diff --git a/pufferlib/ocean/drive/drive.h b/pufferlib/ocean/drive/drive.h index 3a5c0e468f..373c32a583 100644 --- a/pufferlib/ocean/drive/drive.h +++ b/pufferlib/ocean/drive/drive.h @@ -2634,16 +2634,103 @@ void compute_observations(Drive *env) { } } +// Find a random collision-free position on a drivable lane for an existing agent. +// Returns true if a valid position was found and updates the agent's sim_x/y/z/heading. +static bool randomize_agent_position(Drive *env, int agent_idx) { + Agent *agent = &env->agents[agent_idx]; + + // Pre-compute drivable lanes + int drivable_lanes[env->num_roads]; + float lane_lengths[env->num_roads]; + int num_drivable = 0; + float total_lane_length = 0.0f; + for (int i = 0; i < env->num_roads; i++) { + if (env->road_elements[i].type == ROAD_LANE && env->road_elements[i].polyline_length > 0.0f) { + drivable_lanes[num_drivable] = i; + lane_lengths[num_drivable] = env->road_elements[i].polyline_length; + total_lane_length += lane_lengths[num_drivable]; + num_drivable++; + } + } + + if (num_drivable == 0) return false; + + for (int attempt = 0; attempt < MAX_SPAWN_ATTEMPTS; attempt++) { + // Length-weighted lane selection + float r = ((float)rand() / (float)RAND_MAX) * total_lane_length; + float cumulative = 0.0f; + int selected = num_drivable - 1; + for (int k = 0; k < num_drivable; k++) { + cumulative += lane_lengths[k]; + if (r < cumulative) { + selected = k; + break; + } + } + RoadMapElement *lane = &env->road_elements[drivable_lanes[selected]]; + + float spawn_x, spawn_y, spawn_z, spawn_heading; + get_random_point_on_lane(lane, &spawn_x, &spawn_y, &spawn_z, &spawn_heading); + spawn_z += agent->sim_height / 2.0f; + + // Temporarily invalidate this agent so check_spawn_collision skips it + float saved_x = agent->sim_x; + agent->sim_x = INVALID_POSITION; + bool collision = check_spawn_collision(env, env->active_agent_count, spawn_x, spawn_y, spawn_z, + spawn_heading, agent->sim_length, agent->sim_width, agent->sim_height); + agent->sim_x = saved_x; + if (collision) continue; + + // Check offroad + if (check_spawn_offroad(env, spawn_x, spawn_y, spawn_z, spawn_heading, + agent->sim_length, agent->sim_width, agent->sim_height)) + continue; + + agent->sim_x = spawn_x; + agent->sim_y = spawn_y; + agent->sim_z = spawn_z; + agent->sim_heading = spawn_heading; + agent->heading_x = cosf(spawn_heading); + agent->heading_y = sinf(spawn_heading); + // Update stored initial position so future non-random resets are consistent + agent->log_trajectory_x[0] = spawn_x; + agent->log_trajectory_y[0] = spawn_y; + agent->log_trajectory_z[0] = spawn_z; + agent->log_heading[0] = spawn_heading; + return true; + } + return false; +} + void respawn_agent(Drive *env, int agent_idx) { Agent *agent = &env->agents[agent_idx]; - agent->sim_x = agent->log_trajectory_x[0]; - agent->sim_y = agent->log_trajectory_y[0]; - agent->sim_z = agent->log_trajectory_z[0]; - agent->sim_heading = agent->log_heading[0]; - agent->heading_x = cosf(agent->sim_heading); - agent->heading_y = sinf(agent->sim_heading); - agent->sim_vx = agent->log_velocity_x[0]; - agent->sim_vy = agent->log_velocity_y[0]; + + if (env->init_mode == INIT_VARIABLE_AGENT_NUMBER) { + if (!randomize_agent_position(env, agent_idx)) { + // Fallback to original position if no valid spawn found + agent->sim_x = agent->log_trajectory_x[0]; + agent->sim_y = agent->log_trajectory_y[0]; + agent->sim_z = agent->log_trajectory_z[0]; + agent->sim_heading = agent->log_heading[0]; + agent->heading_x = cosf(agent->sim_heading); + agent->heading_y = sinf(agent->sim_heading); + } + // Sample a new goal relative to the new position + sample_new_goal(env, agent_idx); + agent->sim_vx = 0.0f; + agent->sim_vy = 0.0f; + agent->sim_speed = 0.0f; + agent->sim_speed_signed = 0.0f; + } else { + agent->sim_x = agent->log_trajectory_x[0]; + agent->sim_y = agent->log_trajectory_y[0]; + agent->sim_z = agent->log_trajectory_z[0]; + agent->sim_heading = agent->log_heading[0]; + agent->heading_x = cosf(agent->sim_heading); + agent->heading_y = sinf(agent->sim_heading); + agent->sim_vx = agent->log_velocity_x[0]; + agent->sim_vy = agent->log_velocity_y[0]; + } agent->metrics_array[COLLISION_IDX] = 0.0f; agent->metrics_array[OFFROAD_IDX] = 0.0f; agent->metrics_array[REACHED_GOAL_IDX] = 0.0f; @@ -2908,8 +2995,21 @@ void move_dynamics(Drive *env, int action_idx, int agent_idx) { void c_reset(Drive *env) { env->timestep = env->init_steps; - set_start_position(env); - reset_goal_positions(env); + if (env->init_mode == INIT_VARIABLE_AGENT_NUMBER) { + // Randomize all agent positions on reset + for (int x = 0; x < env->active_agent_count; x++) { + int agent_idx = env->active_agent_indices[x]; + randomize_agent_position(env, agent_idx); + } + // Sample new goals relative to new positions + for (int x = 0; x < env->active_agent_count; x++) { + int agent_idx = env->active_agent_indices[x]; + sample_new_goal(env, agent_idx); + } + } else { + set_start_position(env); + reset_goal_positions(env); + } for (int x = 0; x < env->active_agent_count; x++) { env->logs[x] = (Log){0}; int agent_idx = env->active_agent_indices[x]; @@ -2939,7 +3039,7 @@ void c_reset(Drive *env) { agent->prev_goal_z = agent->sim_z; generate_reward_coefs(env, agent); - if (env->goal_behavior == GOAL_GENERATE_NEW) { + if (env->goal_behavior == GOAL_GENERATE_NEW && env->init_mode != INIT_VARIABLE_AGENT_NUMBER) { agent->goal_position_x = agent->init_goal_x; agent->goal_position_y = agent->init_goal_y; agent->goal_position_z = agent->init_goal_z; diff --git a/pufferlib/ocean/drive/visualize.c b/pufferlib/ocean/drive/visualize.c index ccbe58317a..a3fca755f6 100644 --- a/pufferlib/ocean/drive/visualize.c +++ b/pufferlib/ocean/drive/visualize.c @@ -193,11 +193,11 @@ static int make_gif_from_frames(const char *pattern, int fps, const char *palett int eval_gif(const char *map_name, const char *policy_name, int show_grid, int obs_only, int lasers, int show_human_logs, int frame_skip, const char *view_mode, const char *output_topdown, - const char *output_agent, int num_maps, int zoom_in) { + const char *output_agent, int num_maps, int zoom_in, const char *config_path, float render_scale) { // Parse configuration from INI file env_init_config conf = {0}; - const char *ini_file = "pufferlib/config/ocean/drive.ini"; + const char *ini_file = config_path ? config_path : "pufferlib/config/ocean/drive.ini"; if (ini_parse(ini_file, handler, &conf) < 0) { fprintf(stderr, "Error: Could not load %s. Cannot determine environment configuration.\n", ini_file); return -1; @@ -316,7 +316,7 @@ int eval_gif(const char *map_name, const char *policy_name, int show_grid, int o float map_height = env.grid_map->top_left_y - env.grid_map->bottom_right_y; printf("Map size: %.1fx%.1f\n", map_width, map_height); - float scale = 6.0f; + float scale = render_scale > 0 ? render_scale : 6.0f; int img_width = (int)roundf(map_width * scale / 2.0f) * 2; int img_height = (int)roundf(map_height * scale / 2.0f) * 2; @@ -446,13 +446,23 @@ int eval_gif(const char *map_name, const char *policy_name, int show_grid, int o } int main(int argc, char *argv[]) { + // Scan for --config first so INI parsing uses the right file + const char *config_path = NULL; + for (int i = 1; i < argc; i++) { + if (strcmp(argv[i], "--config") == 0 && i + 1 < argc) { + config_path = argv[i + 1]; + break; + } + } + // Parse configuration from INI file - env_init_config conf = {0}; // Initialize to zero - const char *ini_file = "pufferlib/config/ocean/drive.ini"; + env_init_config conf = {0}; + const char *ini_file = config_path ? config_path : "pufferlib/config/ocean/drive.ini"; if (ini_parse(ini_file, handler, &conf) < 0) { fprintf(stderr, "Error: Could not load %s. Cannot determine environment configuration.\n", ini_file); return -1; } + int show_grid = 0; int obs_only = 0; int lasers = 0; @@ -467,6 +477,7 @@ int main(int argc, char *argv[]) { const char *output_topdown = NULL; const char *output_agent = NULL; int num_maps = conf.num_maps; + float render_scale = 0; // Parse command line arguments for (int i = 1; i < argc; i++) { @@ -532,10 +543,20 @@ int main(int argc, char *argv[]) { num_maps = atoi(argv[i + 1]); i++; } + } else if (strcmp(argv[i], "--config") == 0) { + // Already handled in pre-scan above + if (i + 1 < argc) { + i++; + } + } else if (strcmp(argv[i], "--scale") == 0) { + if (i + 1 < argc) { + render_scale = atof(argv[i + 1]); + i++; + } } } eval_gif(map_name, policy_name, show_grid, obs_only, lasers, show_human_logs, frame_skip, view_mode, output_topdown, - output_agent, num_maps, zoom_in); + output_agent, num_maps, zoom_in, config_path, render_scale); return 0; } diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 25fcfcc96a..1cc3cd8ce0 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -17,6 +17,7 @@ import subprocess import argparse import importlib +import json import configparser from threading import Thread from collections import defaultdict, deque @@ -134,6 +135,10 @@ def __init__(self, config, vecenv, policy, logger=None): if self.render_async: self.render_queue = multiprocessing.Queue() self.render_processes = [] + self._render_proc_temp_files = {} # pid -> [temp_file_paths] + + self.eval_threads = [] + self._eval_results_queue = queue.Queue() # thread-safe queue for async eval metrics # LSTM if config["use_rnn"]: @@ -202,9 +207,7 @@ def __init__(self, config, vecenv, policy, logger=None): self.logger = logger if logger is None: self.logger = NoLogger(config) - if self.render_async and hasattr(self.logger, "wandb") and self.logger.wandb: - self.logger.wandb.define_metric("render_step", hidden=True) - self.logger.wandb.define_metric("render/*", step_metric="render_step") + # No custom step_metric needed — all eval types log with step=global_step # Learning rate scheduler epochs = config["total_timesteps"] // config["batch_size"] @@ -514,113 +517,232 @@ def train(self): self.save_checkpoint() self.msg = f"Checkpoint saved at update {self.epoch}" - if self.render and self.epoch % self.render_interval == 0: - model_dir = os.path.join(self.config["data_dir"], f"{self.config['env']}_{self.logger.run_id}") - model_files = glob.glob(os.path.join(model_dir, "model_*.pt")) + # Evals only run on rank 0 in distributed training + is_rank0 = not torch.distributed.is_initialized() or torch.distributed.get_rank() == 0 + + # Render and safe eval run on their own intervals, independent of checkpointing. + # They use the latest available checkpoint, so they don't need a fresh one. + should_render = is_rank0 and self.render and self.epoch % self.render_interval == 0 + safe_eval_config = self.config.get("safe_eval", {}) + run_safe_eval = safe_eval_config.get("enabled", False) + safe_eval_interval = safe_eval_config.get("interval", self.render_interval) + should_safe_eval = is_rank0 and run_safe_eval and self.epoch % safe_eval_interval == 0 + eval_interval = self.config["eval"]["eval_interval"] + should_wosac = ( + is_rank0 + and self.config["eval"]["wosac_realism_eval"] + and (self.epoch % eval_interval == 0 or done_training) + ) + should_human_replay = ( + is_rank0 and self.config["eval"]["human_replay_eval"] and (self.epoch % eval_interval == 0 or done_training) + ) - if model_files: - # Take the latest checkpoint - latest_cpt = max(model_files, key=os.path.getctime) - bin_path = f"{model_dir}.bin" + # Any render-based eval needs a .bin export of the current policy + needs_bin = should_render or should_safe_eval or should_human_replay + + if needs_bin: + model_dir = os.path.join(self.config["data_dir"], f"{self.config['env']}_{self.logger.run_id}") + model_files = glob.glob(os.path.join(model_dir, "model_*.pt")) + + if model_files: + latest_cpt = max(model_files) + bin_path = f"{model_dir}.bin" + + export_args = {"env_name": self.config["env"], "load_model_path": latest_cpt, **self.config} + try: + export( + args=export_args, + env_name=self.config["env"], + vecenv=self.vecenv, + policy=self.uncompiled_policy, + path=bin_path, + silent=True, + ) + + env_cfg = getattr(self.vecenv, "driver_env", None) + wandb_log = bool(hasattr(self.logger, "wandb") and self.logger.wandb) + wandb_run = self.logger.wandb if hasattr(self.logger, "wandb") else None + + if should_render: + self._dispatch_render(model_dir, bin_path, env_cfg, wandb_log, wandb_run, "render") + + if should_safe_eval: + safe_ini_path = pufferlib.utils.generate_safe_eval_ini(safe_eval_config) + self._dispatch_render( + model_dir, + bin_path, + env_cfg, + wandb_log, + wandb_run, + "eval", + config_path=safe_ini_path, + wandb_prefix="eval", + ) + self._run_eval( + pufferlib.utils.run_safe_eval_metrics_in_subprocess, + self.config, + self.logger, + self.global_step, + safe_eval_config, + ) - # Export to .bin for rendering with raylib - try: - export_args = {"env_name": self.config["env"], "load_model_path": latest_cpt, **self.config} - - export( - args=export_args, - env_name=self.config["env"], - vecenv=self.vecenv, - policy=self.uncompiled_policy, - path=bin_path, - silent=True, + if should_human_replay: + hr_ini_path = pufferlib.utils.generate_human_replay_ini(self.config["eval"]) + self._dispatch_render( + model_dir, + bin_path, + env_cfg, + wandb_log, + wandb_run, + "human_replay", + config_path=hr_ini_path, + wandb_prefix="human_replay", ) + finally: + if os.path.exists(bin_path): + os.remove(bin_path) + + # WOSAC and human replay metric subprocesses (don't need bin_path, + # they load from checkpoint via _run_eval_subprocess) + if should_wosac: + self._run_eval( + pufferlib.utils.run_wosac_eval_in_subprocess, + self.config, + self.logger, + self.global_step, + ) - bin_path_epoch = f"{model_dir}_epoch_{self.epoch:06d}.bin" - shutil.copy2(bin_path, bin_path_epoch) - - env_cfg = getattr(self.vecenv, "driver_env", None) - wandb_log = True if hasattr(self.logger, "wandb") and self.logger.wandb else False - wandb_run = self.logger.wandb if hasattr(self.logger, "wandb") else None - if self.render_async: - # Clean up finished processes - self.render_processes = [p for p in self.render_processes if p.is_alive()] - - # Cap the number of processes to num_workers - max_processes = self.config.get("num_workers", 1) - if len(self.render_processes) >= max_processes: - print("Waiting for render processes to finish...") - while len(self.render_processes) >= max_processes: - time.sleep(1) - self.render_processes = [p for p in self.render_processes if p.is_alive()] - - render_proc = multiprocessing.Process( - target=pufferlib.utils.render_videos, - args=( - self.config, - env_cfg, - self.logger.run_id, - wandb_log, - self.epoch, - self.global_step, - bin_path_epoch, - self.render_async, - self.render_queue, - ), - ) - render_proc.start() - self.render_processes.append(render_proc) - else: - pufferlib.utils.render_videos( - self.config, - env_cfg, - self.logger.run_id, - wandb_log, - self.epoch, - self.global_step, - bin_path_epoch, - self.render_async, - wandb_run=wandb_run, - ) - - except Exception as e: - print(f"Failed to export model weights: {e}") - - if self.config["eval"]["wosac_realism_eval"] and ( - (self.epoch - 1) % self.config["eval"]["eval_interval"] == 0 or done_training - ): - pufferlib.utils.run_wosac_eval_in_subprocess(self.config, self.logger, self.global_step) - - if self.config["eval"]["human_replay_eval"] and ( - (self.epoch - 1) % self.config["eval"]["eval_interval"] == 0 or done_training - ): - pufferlib.utils.run_human_replay_eval_in_subprocess(self.config, self.logger, self.global_step) + if should_human_replay: + self._run_eval( + pufferlib.utils.run_human_replay_eval_in_subprocess, + self.config, + self.logger, + self.global_step, + ) + + def _dispatch_render( + self, model_dir, bin_path, env_cfg, wandb_log, wandb_run, suffix, config_path=None, wandb_prefix=None + ): + """Dispatch a render_videos call, either async (multiprocessing) or sync.""" + extra_kwargs = {} + if config_path is not None: + extra_kwargs["config_path"] = config_path + if wandb_prefix is not None: + extra_kwargs["wandb_prefix"] = wandb_prefix + + if self.render_async: + self._cleanup_dead_render_processes() + max_processes = self.config.get("num_workers", 1) + while len(self.render_processes) >= max_processes: + time.sleep(1) + self._cleanup_dead_render_processes() + + bin_copy = f"{model_dir}_epoch_{self.epoch:06d}_{suffix}.bin" + shutil.copy2(bin_path, bin_copy) + proc = multiprocessing.Process( + target=pufferlib.utils.render_videos, + args=( + self.config, + env_cfg, + self.logger.run_id, + wandb_log, + self.epoch, + self.global_step, + bin_copy, + True, + self.render_queue, + ), + kwargs=extra_kwargs, + ) + proc.start() + temp_files = [bin_copy] + if config_path: + temp_files.append(config_path) + self._render_proc_temp_files[proc.pid] = temp_files + self.render_processes.append(proc) + else: + pufferlib.utils.render_videos( + self.config, + env_cfg, + self.logger.run_id, + wandb_log, + self.epoch, + self.global_step, + bin_path, + False, + wandb_run=wandb_run, + **extra_kwargs, + ) + if config_path and os.path.exists(config_path): + os.remove(config_path) + + def _run_eval(self, fn, *args, **kwargs): + """Run an eval function, optionally in a background thread. + + Injects results_queue so eval functions put metrics on the queue + instead of logging directly. The main thread drains the queue in + mean_and_log(). + """ + kwargs["results_queue"] = self._eval_results_queue + eval_async = self.config.get("eval", {}).get("eval_async", False) + # Handle string "False"/"True" from INI config + if isinstance(eval_async, str): + eval_async = eval_async.lower() not in ("false", "0", "no", "") + if eval_async: + # Clean up finished threads + self.eval_threads = [t for t in self.eval_threads if t.is_alive()] + t = Thread(target=fn, args=args, kwargs=kwargs, daemon=True) + t.start() + self.eval_threads.append(t) + else: + fn(*args, **kwargs) + + def _cleanup_dead_render_processes(self): + """Remove dead render processes and clean up their temp files.""" + if not hasattr(self, "render_processes"): + return + alive = [] + for p in self.render_processes: + if p.is_alive(): + alive.append(p) + else: + # Process finished (or crashed) — clean up its temp files + for f in self._render_proc_temp_files.pop(p.pid, []): + if os.path.exists(f): + try: + os.remove(f) + except OSError: + pass + self.render_processes = alive def check_render_queue(self): """Check if any async render jobs finished and log them.""" if not self.render_async or not hasattr(self, "render_queue"): return + self._cleanup_dead_render_processes() + try: while not self.render_queue.empty(): result = self.render_queue.get_nowait() step = result["step"] videos = result["videos"] + prefix = result.get("wandb_prefix", "render") - # Log to wandb if available if hasattr(self.logger, "wandb") and self.logger.wandb: import wandb payload = {} if videos["output_topdown"]: - payload["render/world_state"] = [wandb.Video(p, format="mp4") for p in videos["output_topdown"]] + payload[f"{prefix}/world_state"] = [ + wandb.Video(p, format="mp4") for p in videos["output_topdown"] + ] if videos["output_agent"]: - payload["render/agent_view"] = [wandb.Video(p, format="mp4") for p in videos["output_agent"]] + payload[f"{prefix}/agent_view"] = [wandb.Video(p, format="mp4") for p in videos["output_agent"]] if payload: - # Custom step for render logs to prevent monotonic logic wandb errors - payload["render_step"] = step - self.logger.wandb.log(payload) + payload["train_step"] = step + self._eval_results_queue.put(payload) except queue.Empty: pass @@ -633,14 +755,11 @@ def mean_and_log(self): config = self.config for k in list(self.stats.keys()): - v = self.stats[k] try: - v = np.mean(v) - except: + self.stats[k] = np.mean(self.stats[k]) + except Exception: del self.stats[k] - self.stats[k] = v - device = config["device"] agent_steps = int(dist_sum(self.global_step, device)) logs = { @@ -661,30 +780,70 @@ def mean_and_log(self): return None self.logger.log(logs, agent_steps) + + # Drain eval results queue (populated by async eval threads and render processes) + while True: + try: + payload = self._eval_results_queue.get_nowait() + if hasattr(self.logger, "wandb") and self.logger.wandb: + self.logger.wandb.log(payload) + except queue.Empty: + break + return logs def close(self): self.vecenv.close() self.utilization.stop() - if self.render_async: # Ensure all render processes are properly terminated before closing the queue + # Wait briefly for any background eval threads to finish. + # These are daemon threads, so they'll die when the process exits. + for t in self.eval_threads: + t.join(timeout=10) + if t.is_alive(): + log.warning(f"Eval thread {t.name} still running after 10s, abandoning") + self.eval_threads = [] + + if self.render_async: + # Wait for in-flight render processes to finish so their results + # reach the queue and get uploaded to wandb. if hasattr(self, "render_processes"): for p in self.render_processes: try: + p.join(timeout=120) if p.is_alive(): + print(f"Render process {p.pid} did not finish in time, terminating") p.terminate() p.join(timeout=5) if p.is_alive(): p.kill() except Exception: - # Best-effort cleanup; avoid letting close() crash on process errors - print(f"Failed to terminate render process {p.pid}") - # Optionally clear the list to drop references to finished processes - self.render_processes = [] + print(f"Failed to clean up render process {p.pid}") + # Drain the render queue — moves results to _eval_results_queue + self.check_render_queue() + # Clean up any remaining temp files (from crashed processes) + for pid, files in self._render_proc_temp_files.items(): + for f in files: + if os.path.exists(f): + try: + os.remove(f) + except OSError: + pass + self._render_proc_temp_files.clear() + self.render_processes = [] if hasattr(self, "render_queue"): self.render_queue.close() self.render_queue.join_thread() + # Final drain of eval results queue before finishing wandb + while True: + try: + payload = self._eval_results_queue.get_nowait() + if hasattr(self.logger, "wandb") and self.logger.wandb: + self.logger.wandb.log(payload) + except queue.Empty: + break + model_path = self.save_checkpoint() run_id = self.logger.run_id project_name = "puffer_drive" @@ -1139,7 +1298,13 @@ def train(env_name, args=None, vecenv=None, policy=None, logger=None): else: logger = None - train_config = dict(**args["train"], env=env_name, eval=args.get("eval", {})) + train_config = dict( + **args["train"], + env=env_name, + eval=args.get("eval", {}), + safe_eval=args.get("safe_eval", {}), + env_config=args.get("env", {}), + ) if "vec" in args and "num_workers" in args["vec"]: train_config["num_workers"] = args["vec"]["num_workers"] pufferl = PuffeRL(train_config, vecenv, policy, logger) @@ -1322,6 +1487,81 @@ def eval(env_name, args=None, vecenv=None, policy=None): frames.append("Done") +def safe_eval(env_name, args=None, vecenv=None, policy=None): + """Evaluate policy with safe/law-abiding reward conditioning and output metrics.""" + args = args or load_config(env_name) + + safe_eval_config = args.get("safe_eval", {}) + args["vec"] = dict(backend="PufferEnv", num_envs=1) + args["env"]["num_agents"] = safe_eval_config.get("num_agents", 64) + if "episode_length" in safe_eval_config: + args["env"]["episode_length"] = safe_eval_config["episode_length"] + if "min_goal_distance" in safe_eval_config: + args["env"]["min_goal_distance"] = safe_eval_config["min_goal_distance"] + if "max_goal_distance" in safe_eval_config: + args["env"]["max_goal_distance"] = safe_eval_config["max_goal_distance"] + # Disable map resampling during eval — episodes must complete to generate metrics. + # resample_frequency < episode_length would destroy envs before episodes finish. + args["env"]["resample_frequency"] = 0 + + vecenv = vecenv or load_env(env_name, args) + policy = policy or load_policy(args, vecenv, env_name) + policy.eval() + + num_episodes = args.get("safe_eval", {}).get("num_episodes", 300) + episode_length = args["env"].get("episode_length", 300) + device = args["train"]["device"] + num_agents = vecenv.observation_space.shape[0] + use_rnn = args["train"]["use_rnn"] + + ob, _ = vecenv.reset() + state = {} + dones = torch.zeros(num_agents, device=device) + prev_rewards = torch.zeros(num_agents, device=device) + if use_rnn: + state = dict( + lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), + lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), + ) + + all_stats = defaultdict(list) + episodes_collected = 0 + # Run until we collect enough episode completions + max_steps = (num_episodes // max(num_agents, 1) + 2) * episode_length + for step in range(max_steps): + if episodes_collected >= num_episodes: + break + with torch.no_grad(): + ob_t = torch.as_tensor(ob).to(device) + if use_rnn: + state["reward"] = prev_rewards + state["done"] = dones + logits, value = policy.forward_eval(ob_t, state) + action, logprob, _ = pufferlib.pytorch.sample_logits(logits) + action = action.cpu().numpy().reshape(vecenv.action_space.shape) + + ob, rewards, terminals, truncations, infos = vecenv.step(action) + prev_rewards = torch.as_tensor(rewards).float().to(device) + dones = torch.as_tensor(np.maximum(terminals, truncations)).float().to(device) + for entry in infos: + if isinstance(entry, dict): + episodes_collected += int(entry.get("n", 1)) + for k, v in entry.items(): + try: + float(v) + all_stats[k].append(v) + except (TypeError, ValueError): + pass + metrics = {k: float(np.mean(v)) for k, v in all_stats.items() if len(v) > 0} + + print("SAFE_EVAL_METRICS_START") + print(json.dumps(metrics)) + print("SAFE_EVAL_METRICS_END") + + vecenv.close() + return metrics + + def sweep(args=None, env_name=None): args = args or load_config(env_name) if not args["wandb"] and not args["neptune"]: @@ -1518,7 +1758,7 @@ def ensure_drive_binary(): try: result = subprocess.run( - ["bash", "scripts/build_ocean.sh", "visualize", "local"], capture_output=True, text=True, timeout=300 + ["bash", "scripts/build_ocean.sh", "visualize", "fast"], capture_output=True, text=True, timeout=300 ) if result.returncode != 0: @@ -1578,7 +1818,7 @@ def load_policy(args, vecenv, env_name=""): load_path = args["load_model_path"] if load_path == "latest": - load_path = max(glob.glob(f"experiments/{env_name}*.pt"), key=os.path.getctime) + load_path = max(glob.glob(f"experiments/{env_name}*.pt")) if load_path is not None: state_dict = torch.load(load_path, map_location=device) @@ -1755,7 +1995,7 @@ def render_task(map_path): def main(): - err = "Usage: puffer [train, eval, sweep, controlled_exp, autotune, profile, export, sanity, render] [env_name] [optional args]. --help for more info" + err = "Usage: puffer [train, eval, safe_eval, sweep, controlled_exp, autotune, profile, export, sanity, render] [env_name] [optional args]. --help for more info" if len(sys.argv) < 3: raise pufferlib.APIUsageError(err) @@ -1779,6 +2019,8 @@ def main(): sanity(env_name=env_name) elif mode == "render": render(env_name=env_name) + elif mode == "safe_eval": + safe_eval(env_name=env_name) else: raise pufferlib.APIUsageError(err) diff --git a/pufferlib/utils.py b/pufferlib/utils.py index 0b582992c6..40b87b2381 100644 --- a/pufferlib/utils.py +++ b/pufferlib/utils.py @@ -4,101 +4,138 @@ import shutil import subprocess import json +import configparser +import tempfile -def run_human_replay_eval_in_subprocess(config, logger, global_step): - """ - Run human replay evaluation in a subprocess and log metrics to wandb. +def _normalize_device(device): + """Convert device to a string suitable for torch.load(map_location=...).""" + if isinstance(device, int): + return f"cuda:{device}" + return str(device) + + +def _get_env_reward_bound_names(ini_path="pufferlib/config/ocean/drive.ini"): + """Discover valid reward bound names from the env config section.""" + import re + + config = configparser.ConfigParser() + config.read(ini_path) + bounds = set() + for key in config["env"]: + m = re.match(r"reward_bound_(.+)_min$", key) + if m: + bounds.add(m.group(1)) + return bounds + + +def _run_eval_subprocess( + config, logger, global_step, mode, extra_args, marker_name, wandb_keys=None, results_queue=None +): + """Run an evaluation subprocess and log metrics to wandb. + Args: + config: Training config dict (must have data_dir, env) + logger: Logger with run_id and optional wandb attribute + global_step: Current global training step + mode: pufferl mode to run (e.g. "eval", "safe_eval") + extra_args: List of extra CLI args appended to the base command + marker_name: Marker prefix for JSON extraction (e.g. "WOSAC" looks for WOSAC_METRICS_START/END) + wandb_keys: If dict, maps metric keys to wandb keys. If None, logs all as eval/. + results_queue: If provided, put results on this queue instead of logging directly. """ - try: - run_id = logger.run_id - model_dir = os.path.join(config["data_dir"], f"{config['env']}_{run_id}") - model_files = glob.glob(os.path.join(model_dir, "model_*.pt")) + eval_name = marker_name.lower().replace("_", " ") + run_id = logger.run_id + model_dir = os.path.join(config["data_dir"], f"{config['env']}_{run_id}") + model_files = glob.glob(os.path.join(model_dir, "model_*.pt")) - if not model_files: - print("No model files found for human replay evaluation") - return + if not model_files: + print(f"No model files found for {eval_name} evaluation") + return - latest_cpt = max(model_files, key=os.path.getctime) - - # Prepare evaluation command - eval_config = config["eval"] - cmd = [ - sys.executable, - "-m", - "pufferlib.pufferl", - "eval", - config["env"], - "--load-model-path", - latest_cpt, + latest_cpt = max(model_files) + + cmd = [ + sys.executable, + "-m", + "pufferlib.pufferl", + mode, + config["env"], + "--load-model-path", + latest_cpt, + "--train.device", + _normalize_device(config.get("device", "cuda")), + ] + + cmd += extra_args + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=600, cwd=os.getcwd()) + + start_marker = f"{marker_name}_METRICS_START" + end_marker = f"{marker_name}_METRICS_END" + + if result.returncode == 0: + stdout = result.stdout + has_markers = start_marker in stdout and end_marker in stdout + if has_markers: + start = stdout.find(start_marker) + len(start_marker) + end = stdout.find(end_marker) + metrics = json.loads(stdout[start:end].strip()) + + if hasattr(logger, "wandb") and logger.wandb: + if wandb_keys is not None: + payload = {wandb_keys[k]: metrics[k] for k in wandb_keys if k in metrics} + else: + payload = {f"eval/{k}": v for k, v in metrics.items()} + if payload: + payload["train_step"] = global_step + if results_queue is not None: + results_queue.put(payload) + else: + logger.wandb.log(payload) + else: + print(f"{eval_name} evaluation failed with exit code {result.returncode}: {result.stderr[-1000:]}") + + +def run_human_replay_eval_in_subprocess(config, logger, global_step, results_queue=None): + eval_config = config.get("eval", {}) + _run_eval_subprocess( + config, + logger, + global_step, + mode="eval", + extra_args=[ "--eval.wosac-realism-eval", "False", "--eval.human-replay-eval", "True", "--eval.human-replay-num-agents", - str(eval_config["human_replay_num_agents"]), + str(eval_config.get("human_replay_num_agents", 16)), "--eval.human-replay-control-mode", - str(eval_config["human_replay_control_mode"]), - ] - - # Run human replay evaluation in subprocess - result = subprocess.run(cmd, capture_output=True, text=True, timeout=600, cwd=os.getcwd()) - - if result.returncode == 0: - # Extract JSON from stdout between markers - stdout = result.stdout - if "HUMAN_REPLAY_METRICS_START" in stdout and "HUMAN_REPLAY_METRICS_END" in stdout: - start = stdout.find("HUMAN_REPLAY_METRICS_START") + len("HUMAN_REPLAY_METRICS_START") - end = stdout.find("HUMAN_REPLAY_METRICS_END") - json_str = stdout[start:end].strip() - human_replay_metrics = json.loads(json_str) - - # Log to wandb if available - if hasattr(logger, "wandb") and logger.wandb: - logger.wandb.log( - { - "eval/human_replay_collision_rate": human_replay_metrics["collision_rate"], - "eval/human_replay_offroad_rate": human_replay_metrics["offroad_rate"], - "eval/human_replay_completion_rate": human_replay_metrics["completion_rate"], - }, - step=global_step, - ) - else: - print(f"Human replay evaluation failed with exit code {result.returncode}: {result.stderr}") - - except subprocess.TimeoutExpired: - print("Human replay evaluation timed out") - except Exception as e: - print(f"Failed to run human replay evaluation: {e}") - - -def run_wosac_eval_in_subprocess(config, logger, global_step): - """ - Run WOSAC evaluation in a subprocess and log metrics to wandb. - - Args: - config: Configuration dictionary containing data_dir, env, and wosac settings - logger: Logger object with run_id and optional wandb attribute - epoch: Current training epoch - global_step: Current global training step - - Returns: - None. Prints error messages if evaluation fails. - """ - try: - run_id = logger.run_id - model_dir = os.path.join(config["data_dir"], f"{config['env']}_{run_id}") - model_files = glob.glob(os.path.join(model_dir, "model_*.pt")) - - # Prepare evaluation command - eval_config = config.get("eval", {}) - cmd = [ - sys.executable, - "-m", - "pufferlib.pufferl", - "eval", - config["env"], + str(eval_config.get("human_replay_control_mode", "control_sdc_only")), + "--eval.map-dir", + str(eval_config.get("map_dir", "resources/drive/binaries/training")), + "--env.num-maps", + str(eval_config.get("num_maps", 20)), + ], + marker_name="HUMAN_REPLAY", + wandb_keys={ + "collision_rate": "human_replay/collision_rate", + "offroad_rate": "human_replay/offroad_rate", + "completion_rate": "human_replay/completion_rate", + }, + results_queue=results_queue, + ) + + +def run_wosac_eval_in_subprocess(config, logger, global_step, results_queue=None): + eval_config = config.get("eval", {}) + _run_eval_subprocess( + config, + logger, + global_step, + mode="eval", + extra_args=[ "--eval.wosac-realism-eval", "True", "--eval.wosac-batch-size", @@ -119,74 +156,60 @@ def run_wosac_eval_in_subprocess(config, logger, global_step): str(eval_config.get("wosac_goal_radius", 2.0)), "--eval.wosac-sanity-check", str(eval_config.get("wosac_sanity_check", False)), - ] - - if not model_files: - print("No model files found for WOSAC evaluation. Running WOSAC with random policy.") - elif len(model_files) > 0: - latest_cpt = max(model_files, key=os.path.getctime) - cmd.extend(["--load-model-path", latest_cpt]) - - # Run WOSAC evaluation in subprocess - result = subprocess.run(cmd, capture_output=True, text=True, timeout=600, cwd=os.getcwd()) - - if result.returncode == 0: - # Extract JSON from stdout between markers - stdout = result.stdout - if "WOSAC_METRICS_START" in stdout and "WOSAC_METRICS_END" in stdout: - start = stdout.find("WOSAC_METRICS_START") + len("WOSAC_METRICS_START") - end = stdout.find("WOSAC_METRICS_END") - json_str = stdout[start:end].strip() - wosac_metrics = json.loads(json_str) - - # Log to wandb if available - if hasattr(logger, "wandb") and logger.wandb: - logger.wandb.log( - { - "eval/wosac_realism_meta_score": wosac_metrics["realism_meta_score"], - "eval/realism_meta_score_std": wosac_metrics["realism_meta_score_std"], - "eval/wosac_kinematic_metrics": wosac_metrics["kinematic_metrics"], - "eval/wosac_interactive_metrics": wosac_metrics["interactive_metrics"], - "eval/wosac_map_based_metrics": wosac_metrics["map_based_metrics"], - "eval/wosac_ade": wosac_metrics["ade"], - "eval/wosac_min_ade": wosac_metrics["min_ade"], - "eval/wosac_total_num_agents": wosac_metrics["total_num_agents"], - }, - step=global_step, - ) - else: - print(f"WOSAC evaluation failed with exit code {result.returncode}") - print(f"Error: {result.stderr}") - - # Check for memory issues - stderr_lower = result.stderr.lower() - if "out of memory" in stderr_lower or "cuda out of memory" in stderr_lower: - print("GPU out of memory. Skipping this WOSAC evaluation.") - - except subprocess.TimeoutExpired: - print("WOSAC evaluation timed out after 600 seconds") - except MemoryError as e: - print(f"WOSAC evaluation ran out of memory. Skipping this evaluation: {e}") - except Exception as e: - print(f"Failed to run WOSAC evaluation: {type(e).__name__}: {e}") + "--eval.wosac-aggregate-results", + str(eval_config.get("wosac_aggregate_results", True)), + "--eval.wosac-eval-mode", + str(eval_config.get("wosac_eval_mode", "policy")), + "--env.episode-length", + str(eval_config.get("wosac_episode_length", 91)), + "--eval.map-dir", + str(eval_config.get("map_dir", "resources/drive/binaries/training")), + ], + marker_name="WOSAC", + wandb_keys={ + "realism_meta_score": "wosac/realism_meta_score", + "realism_meta_score_std": "wosac/realism_meta_score_std", + "kinematic_metrics": "wosac/kinematic_metrics", + "interactive_metrics": "wosac/interactive_metrics", + "map_based_metrics": "wosac/map_based_metrics", + "ade": "wosac/ade", + "min_ade": "wosac/min_ade", + "total_num_agents": "wosac/total_num_agents", + }, + results_queue=results_queue, + ) def render_videos( - config, env_cfg, run_id, wandb_log, epoch, global_step, bin_path, render_async, render_queue=None, wandb_run=None + config, + env_cfg, + run_id, + wandb_log, + epoch, + global_step, + bin_path, + render_async, + render_queue=None, + wandb_run=None, + config_path=None, + wandb_prefix="render", ): """ Generate and log training videos using C-based rendering. Args: config: Configuration dictionary containing data_dir, env, and render settings - vecenv: Vectorized environment with driver_env attribute - logger: Logger object with run_id and optional wandb attribute + env_cfg: Environment config object (driver_env) with map_dir, num_maps, etc. + run_id: Wandb/Neptune run identifier + wandb_log: Whether to log videos to wandb epoch: Current training epoch global_step: Current global training step bin_path: Path to the exported .bin model weights file - - Returns: - None. Prints error messages if rendering fails. + render_async: Whether rendering is async (uses render_queue) + render_queue: Queue for async render results + wandb_run: Wandb run object for sync logging + config_path: Optional path to alternative INI config file for the visualize binary + wandb_prefix: Prefix for wandb keys (e.g. "render" or "eval") """ if not os.path.exists(bin_path): print(f"Binary weights file does not exist: {bin_path}") @@ -194,156 +217,247 @@ def render_videos( model_dir = os.path.join(config["data_dir"], f"{config['env']}_{run_id}") - # Now call the C rendering function - try: - # Create output directory for videos - video_output_dir = os.path.join(model_dir, "videos") - os.makedirs(video_output_dir, exist_ok=True) - - # TODO: Fix memory leaks so that this is not needed - # Suppress AddressSanitizer exit code (temp) - env_vars = os.environ.copy() - env_vars["ASAN_OPTIONS"] = "exitcode=0" - - # Base command with only visualization flags (env config comes from INI) - base_cmd = ["xvfb-run", "-a", "-s", "-screen 0 1280x720x24", "./visualize"] - - # Visualization config flags only - if config.get("show_grid", False): - base_cmd.append("--show-grid") - if config.get("obs_only", False): - base_cmd.append("--obs-only") - if config.get("show_lasers", False): - base_cmd.append("--lasers") - if config.get("show_human_logs", False): - base_cmd.append("--show-human-logs") - if config.get("zoom_in", False): - base_cmd.append("--zoom-in") - - # Frame skip for rendering performance - frame_skip = config.get("frame_skip", 1) - if frame_skip > 1: - base_cmd.extend(["--frame-skip", str(frame_skip)]) - - # View mode - view_mode = config.get("view_mode", "both") - base_cmd.extend(["--view", view_mode]) - - # Get num_maps if available - if env_cfg is not None and getattr(env_cfg, "num_maps", None): - base_cmd.extend(["--num-maps", str(env_cfg.num_maps)]) - - base_cmd.extend(["--policy-name", bin_path]) - - # Handle single or multiple map rendering - render_maps = config.get("render_map", None) - if render_maps is None or render_maps == "none": - # Pick a random map from the training map_dir - map_dir = None - if env_cfg is not None and hasattr(env_cfg, "map_dir"): - map_dir = env_cfg.map_dir - if map_dir and os.path.isdir(map_dir): - import random - - bin_files = [f for f in os.listdir(map_dir) if f.endswith(".bin")] - if bin_files: - render_maps = [os.path.join(map_dir, random.choice(bin_files))] - else: - print(f"Warning: No .bin files found in {map_dir}, skipping render") - return + video_output_dir = os.path.join(model_dir, "videos") + os.makedirs(video_output_dir, exist_ok=True) + + # TODO: Fix memory leaks so that this is not needed + env_vars = os.environ.copy() + env_vars["ASAN_OPTIONS"] = "exitcode=0" + + base_cmd = ["xvfb-run", "-a", "-s", "-screen 0 1280x720x24", "./visualize"] + + if config_path: + base_cmd.extend(["--config", config_path]) + + if config.get("show_grid", False): + base_cmd.append("--show-grid") + if config.get("obs_only", False): + base_cmd.append("--obs-only") + if config.get("show_lasers", False): + base_cmd.append("--lasers") + if config.get("show_human_logs", False): + base_cmd.append("--show-human-logs") + if config.get("zoom_in", False): + base_cmd.append("--zoom-in") + + frame_skip = config.get("frame_skip", 1) + if frame_skip > 1: + base_cmd.extend(["--frame-skip", str(frame_skip)]) + + view_mode = config.get("view_mode", "both") + base_cmd.extend(["--view", view_mode]) + + if env_cfg is not None and getattr(env_cfg, "num_maps", None): + base_cmd.extend(["--num-maps", str(env_cfg.num_maps)]) + + base_cmd.extend(["--policy-name", bin_path]) + + # Handle single or multiple map rendering + render_maps = config.get("render_map", None) + if render_maps is None or render_maps == "none": + map_dir = None + if env_cfg is not None and hasattr(env_cfg, "map_dir"): + map_dir = env_cfg.map_dir + if map_dir and os.path.isdir(map_dir): + import random + + bin_files = [f for f in os.listdir(map_dir) if f.endswith(".bin")] + if bin_files: + render_maps = [os.path.join(map_dir, random.choice(bin_files))] else: - print(f"Warning: map_dir not found or invalid ({map_dir}), skipping render") + print(f"Warning: No .bin files found in {map_dir}, skipping render") return - elif isinstance(render_maps, (str, os.PathLike)): - render_maps = [render_maps] else: - # Ensure list-like - render_maps = list(render_maps) - - # Collect videos to log as lists so W&B shows all in the same step - videos_to_log_world = [] - videos_to_log_agent = [] - generated_videos = {"output_topdown": [], "output_agent": []} - output_topdown = f"resources/drive/output_topdown_{epoch}" - output_agent = f"resources/drive/output_agent_{epoch}" - - for i, map_path in enumerate(render_maps): - cmd = list(base_cmd) # copy - if map_path is not None and os.path.exists(map_path): - cmd.extend(["--map-name", str(map_path)]) - - output_topdown_map = output_topdown + (f"_map{i:02d}.mp4" if len(render_maps) > 1 else ".mp4") - output_agent_map = output_agent + (f"_map{i:02d}.mp4" if len(render_maps) > 1 else ".mp4") - - # Output paths (overwrite each iteration; then moved/renamed) - cmd.extend(["--output-topdown", output_topdown_map]) - cmd.extend(["--output-agent", output_agent_map]) - - result = subprocess.run(cmd, cwd=os.getcwd(), capture_output=True, text=True, timeout=1200, env=env_vars) - - vids_exist = os.path.exists(output_topdown_map) and os.path.exists(output_agent_map) - - if result.returncode == 0 or (result.returncode == 1 and vids_exist): - videos = [ - ( - "output_topdown", - output_topdown_map, - f"epoch_{epoch:06d}_map{i:02d}_topdown.mp4" if map_path else f"epoch_{epoch:06d}_topdown.mp4", - ), - ( - "output_agent", - output_agent_map, - f"epoch_{epoch:06d}_map{i:02d}_agent.mp4" if map_path else f"epoch_{epoch:06d}_agent.mp4", - ), - ] - - for vid_type, source_vid, target_filename in videos: - if os.path.exists(source_vid): - target_path = os.path.join(video_output_dir, target_filename) - shutil.move(source_vid, target_path) - generated_videos[vid_type].append(target_path) - if render_async: - continue - # Accumulate for a single wandb.log call - if wandb_log: - import wandb - - if "topdown" in target_filename: - videos_to_log_world.append(wandb.Video(target_path, format="mp4")) - else: - videos_to_log_agent.append(wandb.Video(target_path, format="mp4")) - else: - print(f"Video generation completed but {source_vid} not found") - if result.stdout: - print(f"StdOUT: {result.stdout}") - if result.stderr: - print(f"StdERR: {result.stderr}") - else: - print(f"C rendering failed (map index {i}) with exit code {result.returncode}: {result.stdout}") - - if render_async: - render_queue.put( - { - "videos": generated_videos, - "step": global_step, - } - ) - - # Log all videos at once so W&B keeps all of them under the same step - if wandb_log and (videos_to_log_world or videos_to_log_agent) and not render_async: - payload = {} - if videos_to_log_world: - payload["render/world_state"] = videos_to_log_world - if videos_to_log_agent: - payload["render/agent_view"] = videos_to_log_agent - wandb_run.log(payload, step=global_step) - - except subprocess.TimeoutExpired: - print("C rendering timed out") - except Exception as e: - print(f"Failed to generate GIF: {e}") - - finally: - # Clean up bin weights file - if os.path.exists(bin_path): - os.remove(bin_path) + print(f"Warning: map_dir not found or invalid ({map_dir}), skipping render") + return + elif isinstance(render_maps, (str, os.PathLike)): + render_maps = [render_maps] + else: + render_maps = list(render_maps) + + file_prefix = f"{wandb_prefix}_" if wandb_prefix != "render" else "" + videos_to_log_world = [] + videos_to_log_agent = [] + generated_videos = {"output_topdown": [], "output_agent": []} + output_topdown = f"resources/drive/{file_prefix}output_topdown_{epoch}" + output_agent = f"resources/drive/{file_prefix}output_agent_{epoch}" + + for i, map_path in enumerate(render_maps): + cmd = list(base_cmd) + if map_path is not None and os.path.exists(map_path): + cmd.extend(["--map-name", str(map_path)]) + + output_topdown_map = output_topdown + (f"_map{i:02d}.mp4" if len(render_maps) > 1 else ".mp4") + output_agent_map = output_agent + (f"_map{i:02d}.mp4" if len(render_maps) > 1 else ".mp4") + + cmd.extend(["--output-topdown", output_topdown_map]) + cmd.extend(["--output-agent", output_agent_map]) + + result = subprocess.run(cmd, cwd=os.getcwd(), capture_output=True, text=True, timeout=3600, env=env_vars) + + vids_exist = os.path.exists(output_topdown_map) and os.path.exists(output_agent_map) + + if result.returncode == 0 or (result.returncode == 1 and vids_exist): + videos = [ + ( + "output_topdown", + output_topdown_map, + f"{file_prefix}epoch_{epoch:06d}_map{i:02d}_topdown.mp4" + if map_path + else f"{file_prefix}epoch_{epoch:06d}_topdown.mp4", + ), + ( + "output_agent", + output_agent_map, + f"{file_prefix}epoch_{epoch:06d}_map{i:02d}_agent.mp4" + if map_path + else f"{file_prefix}epoch_{epoch:06d}_agent.mp4", + ), + ] + + for vid_type, source_vid, target_filename in videos: + if os.path.exists(source_vid): + target_path = os.path.join(video_output_dir, target_filename) + shutil.move(source_vid, target_path) + generated_videos[vid_type].append(target_path) + if render_async: + continue + if wandb_log: + import wandb + + if "topdown" in target_filename: + videos_to_log_world.append(wandb.Video(target_path, format="mp4")) + else: + videos_to_log_agent.append(wandb.Video(target_path, format="mp4")) + else: + print(f"Video generation completed but {source_vid} not found") + if result.stdout: + print(f"StdOUT: {result.stdout}") + if result.stderr: + print(f"StdERR: {result.stderr}") + else: + print(f"C rendering failed (map index {i}) with exit code {result.returncode}: {result.stdout}") + + if render_async: + render_queue.put( + { + "videos": generated_videos, + "step": global_step, + "wandb_prefix": wandb_prefix, + "bin_path": bin_path, + "config_path": config_path, + } + ) + + if wandb_log and (videos_to_log_world or videos_to_log_agent) and not render_async: + payload = {} + if videos_to_log_world: + payload[f"{wandb_prefix}/world_state"] = videos_to_log_world + if videos_to_log_agent: + payload[f"{wandb_prefix}/agent_view"] = videos_to_log_agent + payload["train_step"] = global_step + wandb_run.log(payload) + + +def generate_safe_eval_ini(safe_eval_config, base_ini_path="pufferlib/config/ocean/drive.ini"): + """Generate a temporary ini file with safe/law-abiding reward conditioning values. + + Sets reward_randomization=1 with min=max bounds so the conditioning values + are deterministically set to the safe values the policy sees in its observation. + """ + config = configparser.ConfigParser() + config.read(base_ini_path) + + valid_bounds = _get_env_reward_bound_names(base_ini_path) + for key, val in safe_eval_config.items(): + if key not in valid_bounds: + continue + val = str(val) + config.set("env", f"reward_bound_{key}_min", val) + config.set("env", f"reward_bound_{key}_max", val) + + config.set("env", "reward_randomization", "1") + config.set("env", "reward_conditioning", "1") + + # Match the metrics subprocess setup so the render shows the same behavior + config.set("env", "episode_length", str(safe_eval_config.get("episode_length", 1000))) + config.set("env", "resample_frequency", "0") + config.set("env", "num_agents", str(safe_eval_config.get("num_agents", 64))) + config.set("env", "min_goal_distance", str(safe_eval_config.get("min_goal_distance", 0.5))) + config.set("env", "max_goal_distance", str(safe_eval_config.get("max_goal_distance", 1000.0))) + + fd, tmp_path = tempfile.mkstemp(suffix=".ini", prefix="safe_eval_") + with os.fdopen(fd, "w") as f: + config.write(f) + + return tmp_path + + +def generate_human_replay_ini(eval_config, base_ini_path="pufferlib/config/ocean/drive.ini"): + """Generate a temporary ini file for human replay rendering. + + Sets control_mode to control_sdc_only so only the SDC is policy-controlled, + with all other agents replaying logged trajectories. + """ + config = configparser.ConfigParser() + config.read(base_ini_path) + + config.set("env", "control_mode", '"control_sdc_only"') + config.set("env", "init_mode", '"create_all_valid"') + config.set("env", "init_steps", "10") + # Use eval map_dir (waymo maps), not training map_dir + map_dir = eval_config.get("map_dir", "resources/drive/binaries/training") + config.set("env", "map_dir", f'"{map_dir}"') + + fd, tmp_path = tempfile.mkstemp(suffix=".ini", prefix="human_replay_") + with os.fdopen(fd, "w") as f: + config.write(f) + + return tmp_path + + +def run_safe_eval_metrics_in_subprocess(config, logger, global_step, safe_eval_config, results_queue=None): + """Run policy evaluation with safe reward conditioning in a subprocess and log metrics.""" + num_episodes = safe_eval_config.get("num_episodes", 300) + + # Forward training env's map_dir and num_maps so the subprocess uses the + # same maps as training (the default INI may point elsewhere). + env_config = config.get("env_config", {}) + extra_args = [ + "--env.reward-randomization", + "1", + "--env.reward-conditioning", + "1", + "--safe-eval.num-episodes", + str(num_episodes), + "--safe-eval.num-agents", + str(safe_eval_config.get("num_agents", 64)), + f"--env.map-dir={env_config.get('map_dir', 'resources/drive/binaries/training')}", + f"--env.num-maps={env_config.get('num_maps', 100)}", + ] + + # Pass safe_eval overrides that safe_eval() applies to env config + safe_eval_overrides = ["episode_length", "min_goal_distance", "max_goal_distance"] + for key in safe_eval_overrides: + if key in safe_eval_config: + cli_key = key.replace("_", "-") + extra_args.extend([f"--safe-eval.{cli_key}", str(safe_eval_config[key])]) + + valid_bounds = _get_env_reward_bound_names() + for key, val in safe_eval_config.items(): + if key not in valid_bounds: + continue + val = str(val) + cli_name = key.replace("_", "-") + # Use = syntax to avoid argparse interpreting negative values as flags + extra_args.extend([f"--env.reward-bound-{cli_name}-min={val}", f"--env.reward-bound-{cli_name}-max={val}"]) + + _run_eval_subprocess( + config, + logger, + global_step, + mode="safe_eval", + extra_args=extra_args, + marker_name="SAFE_EVAL", + results_queue=results_queue, + ) diff --git a/scripts/cluster_configs/nyu_greene.yaml b/scripts/cluster_configs/nyu_greene.yaml index 06dd3a3b46..a82cd2659a 100644 --- a/scripts/cluster_configs/nyu_greene.yaml +++ b/scripts/cluster_configs/nyu_greene.yaml @@ -6,5 +6,4 @@ cpus: 16 mem: 32gb time: 360 # minutes gpu_type: null # rtx8000, a100, v100 (optional, uses partition default) -exclude: "" nodelist: null diff --git a/scripts/submit_cluster.py b/scripts/submit_cluster.py index 7c697c0692..597931819c 100644 --- a/scripts/submit_cluster.py +++ b/scripts/submit_cluster.py @@ -91,7 +91,7 @@ def parse_args(): parser.add_argument( "--container_overlay", type=str, - default="/scratch/ev2237/containers/pufferdrive/overlay.ext3", + default="/scratch/ev2237/containers/pufferdrive-overlay.ext3", help="Singularity overlay path", ) @@ -223,20 +223,25 @@ def submit(args, job_name: str, command: List[str], save_dir: str, dry: bool): if from_config.get("nodelist") is not None: additional_parameters["nodelist"] = from_config["nodelist"] - executor.update_parameters( + params = dict( slurm_account=from_config.get("account"), slurm_partition=from_config.get("partition"), cpus_per_task=from_config.get("cpus", 8) // args.task_per_node, tasks_per_node=args.task_per_node, nodes=from_config.get("nodes", 1), slurm_gres=gres, - slurm_exclude=from_config.get("exclude", ""), slurm_mem=from_config.get("mem"), slurm_time=from_config.get("time", 60), slurm_job_name=job_name, slurm_additional_parameters=additional_parameters, ) + exclude = from_config.get("exclude", "") + if exclude: + params["slurm_exclude"] = exclude + + executor.update_parameters(**params) + def launch_training(args, from_config, cmd, save_dir, project_root, container_config=None): """Runs inside the SLURM allocation.""" import os