From ced67e257e41fe54bed445195f67281d3bbf294e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wa=C3=ABl=20Doulazmi?= Date: Fri, 16 Jan 2026 18:23:39 +0000 Subject: [PATCH 01/10] Add a Flag to build_ocean so Raylib can work on Debian 11 --- scripts/build_ocean.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/build_ocean.sh b/scripts/build_ocean.sh index f090291120..a737f5654d 100755 --- a/scripts/build_ocean.sh +++ b/scripts/build_ocean.sh @@ -101,6 +101,7 @@ FLAGS=( $LINK_ARCHIVES -lm -lpthread + -ldl $ERROR_LIMIT_FLAG -DPLATFORM_DESKTOP ) From 7ee74290d2aa1651418d1333fb0a2f7ec172c5a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wa=C3=ABl=20Doulazmi?= Date: Fri, 20 Feb 2026 00:03:33 +0100 Subject: [PATCH 02/10] WIP: refacto of the eval utilities. Still need to decide on some design choices, like using a subprocess or not, separate eval from rendering... --- pufferlib/ocean/drive/binding.c | 20 ++++++++++++-------- pufferlib/ocean/drive/drive.py | 15 ++++++++++----- pufferlib/pufferl.py | 6 ++++++ 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index 76025de688..7e1281bd04 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -91,6 +91,10 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { float min_goal_distance = unpack(kwargs, "min_goal_distance"); float max_goal_distance = unpack(kwargs, "max_goal_distance"); + int eval_batch_size = unpack(kwargs, "eval_batch_size"); + int eval_map_counter = unpack(kwargs, "eval_map_counter"); + bool eval_mode = eval_batch_size > 0; + float reward_bound_goal_radius_min = unpack(kwargs, "reward_bound_goal_radius_min"); float reward_bound_goal_radius_max = unpack(kwargs, "reward_bound_goal_radius_max"); float reward_bound_collision_min = unpack(kwargs, "reward_bound_collision_min"); @@ -124,20 +128,18 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { float reward_bound_acc_min = unpack(kwargs, "reward_bound_acc_min"); float reward_bound_acc_max = unpack(kwargs, "reward_bound_acc_max"); - int use_all_maps = unpack(kwargs, "use_all_maps"); - clock_gettime(CLOCK_REALTIME, &ts); srand(ts.tv_nsec); int total_agent_count = 0; int env_count = 0; - int max_envs = use_all_maps ? num_maps : num_agents; - int map_idx = 0; + int max_envs = eval_mode ? eval_batch_size : num_agents; + int map_idx = eval_map_counter; int maps_checked = 0; PyObject *agent_offsets = PyList_New(max_envs + 1); PyObject *map_ids = PyList_New(max_envs); // getting env count - while (use_all_maps ? map_idx < max_envs : total_agent_count < num_agents && env_count < max_envs) { - int map_id = use_all_maps ? map_idx++ : rand() % num_maps; + while (total_agent_count < num_agents && env_count < max_envs) { + int map_id = eval_mode ? map_idx++ : rand() % num_maps; Drive *env = calloc(1, sizeof(Drive)); env->init_mode = init_mode; env->control_mode = control_mode; @@ -181,7 +183,7 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { // Skip map if it doesn't contain any controllable agents if (env->active_agent_count == 0) { - if (!use_all_maps) { + if (!eval_mode) { maps_checked++; // Safeguard: if we've checked all available maps and found no active agents, raise an error @@ -248,7 +250,9 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { } // printf("Generated %d environments to cover %d agents (requested %d agents)\n", env_count, total_agent_count, // num_agents); - if (!use_all_maps && total_agent_count >= num_agents) { + // NOTE: even in eval we want a fixed value of num_agents now, else you cannot batch. + // I still need to think about what we do with the cropped scenario + if (total_agent_count >= num_agents) { total_agent_count = num_agents; } PyObject *final_total_agent_count = PyLong_FromLong(total_agent_count); diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index 9d80b14a19..fd51a4922d 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -47,8 +47,8 @@ def __init__( init_mode="create_all_valid", control_mode="control_vehicles", map_dir="resources/drive/binaries/training", - use_all_maps=False, allow_fewer_maps=True, + eval_batch_size=-1, # reward randomization bounds reward_bound_goal_radius_min=2.0, reward_bound_goal_radius_max=12.0, @@ -109,6 +109,7 @@ def __init__( self.termination_mode = termination_mode self.resample_frequency = resample_frequency self.dynamics_model = dynamics_model + self.eval_batch_size = eval_batch_size # reward randomization bounds self.reward_bound_goal_radius_min = reward_bound_goal_radius_min self.reward_bound_collision_min = reward_bound_collision_min @@ -242,6 +243,8 @@ def __init__( f"Please reduce num_maps, add more maps to {map_dir}, or set allow_fewer_maps=True." ) + # This is to track on which maps the evaluation is running + self.eval_map_counter = 0 # Iterate through all maps to count total agents that can be initialized for each map agent_offsets, map_ids, num_envs = binding.shared( map_files=self.map_files, @@ -255,6 +258,7 @@ def __init__( reward_conditioning=self.reward_conditioning, min_goal_distance=self.min_goal_distance, max_goal_distance=self.max_goal_distance, + eval_batch_size=self.eval_batch_size, reward_bound_goal_radius_min=self.reward_bound_goal_radius_min, reward_bound_goal_radius_max=self.reward_bound_goal_radius_max, reward_bound_collision_min=self.reward_bound_collision_min, @@ -287,14 +291,14 @@ def __init__( reward_bound_steer_max=self.reward_bound_steer_max, reward_bound_acc_min=self.reward_bound_acc_min, reward_bound_acc_max=self.reward_bound_acc_max, - use_all_maps=use_all_maps, ) - # agent_offsets[-1] works in both cases, just making it explicit that num_agents is ignored if use_all_maps - self.num_agents = num_agents if not use_all_maps else agent_offsets[-1] + self.num_agents = num_agents self.agent_offsets = agent_offsets self.map_ids = map_ids self.num_envs = num_envs + # NOTE: What will happen if the map_counter becomes higher than num_maps ? + self.eval_map_counter += num_envs super().__init__(buf=buf) env_ids = [] for i in range(num_envs): @@ -403,6 +407,7 @@ def step(self, actions): reward_conditioning=self.reward_conditioning, min_goal_distance=self.min_goal_distance, max_goal_distance=self.max_goal_distance, + eval_batch_size=self.eval_batch_size, # reward randomization bounds reward_bound_collision_min=self.reward_bound_collision_min, reward_bound_goal_radius_min=self.reward_bound_goal_radius_min, @@ -436,11 +441,11 @@ def step(self, actions): reward_bound_steer_max=self.reward_bound_steer_max, reward_bound_acc_min=self.reward_bound_acc_min, reward_bound_acc_max=self.reward_bound_acc_max, - use_all_maps=False, ) self.agent_offsets = agent_offsets self.map_ids = map_ids self.num_envs = num_envs + self.eval_map_counter += num_envs env_ids = [] seed = np.random.randint(0, 2**32 - 1) for i in range(num_envs): diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 1c80de553b..d0f76c10bc 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -587,6 +587,12 @@ def train(self): ): pufferlib.utils.run_human_replay_eval_in_subprocess(self.config, self.logger, self.global_step) + # For now I add a 3rd eval function, goal is to later unify everything. + if self.config["eval"]["eval_batch"] and ( + self.epoch % self.config["eval"]["eval_interval"] == 0 or done_training + ): + pufferlib.utils.run_human_replay_eval_in_subprocess(self.config, self.logger, self.global_step) + def check_render_queue(self): """Check if any async render jobs finished and log them.""" if not self.render_async or not hasattr(self, "render_queue"): From 31097476ea606e46f2e9fc101179a2a998188257 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wa=C3=ABl=20Doulazmi?= Date: Sat, 28 Feb 2026 18:08:58 +0100 Subject: [PATCH 03/10] Implement eval_womd function, in a way it will be easy to add multiprocessing. Next steps: - Add multiprocessing - Save logs in a nice csv (with nice filenames and stuff) - Add a nice looking tqdm - Add support to run this during training - Add support for Gigaflow and eventually WOSAC --- pufferlib/config/ocean/drive.ini | 8 +++- pufferlib/ocean/drive/binding.c | 11 ++++- pufferlib/ocean/drive/drive.py | 8 +++- pufferlib/ocean/env_binding.h | 47 ++++++++++++++++++++-- pufferlib/pufferl.py | 69 +++++++++++++++++++++++++++++--- 5 files changed, 130 insertions(+), 13 deletions(-) diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index b9d3bac319..6a1d776e78 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -164,7 +164,13 @@ eval_interval = 1000 ; Path to dataset used for evaluation map_dir = "resources/drive/binaries/training" ; Evaluation will run on the first num_maps maps in the map_dir directory -num_maps = 20 +num_agents = 512 +num_maps=29 +eval_batch_size = 10 + +; Put control_sdc_only for log-replay, "control_vehicles" for self-play +control_mode = "control_vehicles" + backend = PufferEnv ; WOSAC (Waymo Open Sim Agents Challenge) evaluation settings ; If True, enables evaluation on realism metrics each time we save a checkpoint diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index 7e1281bd04..de65baf8ba 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -132,14 +132,21 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { srand(ts.tv_nsec); int total_agent_count = 0; int env_count = 0; - int max_envs = eval_mode ? eval_batch_size : num_agents; + int max_envs = num_agents; + + // The following lines should be modified when we want to do eval in GIGAFLOW + if (eval_mode) { + int remaining_maps = num_maps - eval_map_counter; + max_envs = eval_batch_size < remaining_maps ? eval_batch_size : remaining_maps; + } + int map_idx = eval_map_counter; int maps_checked = 0; PyObject *agent_offsets = PyList_New(max_envs + 1); PyObject *map_ids = PyList_New(max_envs); // getting env count while (total_agent_count < num_agents && env_count < max_envs) { - int map_id = eval_mode ? map_idx++ : rand() % num_maps; + int map_id = (eval_mode ? map_idx++ : rand()) % num_maps; Drive *env = calloc(1, sizeof(Drive)); env->init_mode = init_mode; env->control_mode = control_mode; diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index fd51a4922d..f6348f8605 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -110,6 +110,7 @@ def __init__( self.resample_frequency = resample_frequency self.dynamics_model = dynamics_model self.eval_batch_size = eval_batch_size + self.eval_mode = eval_batch_size > 0 # reward randomization bounds self.reward_bound_goal_radius_min = reward_bound_goal_radius_min self.reward_bound_collision_min = reward_bound_collision_min @@ -259,6 +260,7 @@ def __init__( min_goal_distance=self.min_goal_distance, max_goal_distance=self.max_goal_distance, eval_batch_size=self.eval_batch_size, + eval_map_counter=self.eval_map_counter, reward_bound_goal_radius_min=self.reward_bound_goal_radius_min, reward_bound_goal_radius_max=self.reward_bound_goal_radius_max, reward_bound_collision_min=self.reward_bound_collision_min, @@ -388,10 +390,13 @@ def step(self, actions): self.tick += 1 info = [] if self.tick % self.report_interval == 0: - log = binding.vec_log(self.c_envs, self.num_agents) + log = binding.vec_log(self.c_envs, self.num_agents, self.eval_mode) if log: info.append(log) # print(log) + # In eval mode, I don't want to resample after the last batch (womd specific) + if self.eval_mode and self.eval_map_counter >= self.num_maps: + self.resample_frequency = 0 if self.tick > 0 and self.resample_frequency > 0 and self.tick % self.resample_frequency == 0: self.tick = 0 binding.vec_close(self.c_envs) @@ -408,6 +413,7 @@ def step(self, actions): min_goal_distance=self.min_goal_distance, max_goal_distance=self.max_goal_distance, eval_batch_size=self.eval_batch_size, + eval_map_counter=self.eval_map_counter, # reward randomization bounds reward_bound_collision_min=self.reward_bound_collision_min, reward_bound_goal_radius_min=self.reward_bound_goal_radius_min, diff --git a/pufferlib/ocean/env_binding.h b/pufferlib/ocean/env_binding.h index f4544a96ff..fe0bc826e5 100644 --- a/pufferlib/ocean/env_binding.h +++ b/pufferlib/ocean/env_binding.h @@ -556,8 +556,8 @@ static int assign_to_dict(PyObject *dict, char *key, float value) { } static PyObject *vec_log(PyObject *self, PyObject *args) { - if (PyTuple_Size(args) != 2) { - PyErr_SetString(PyExc_TypeError, "vec_log requires 2 arguments"); + if (PyTuple_Size(args) != 3) { + PyErr_SetString(PyExc_TypeError, "vec_log requires 3 arguments"); return NULL; } @@ -566,13 +566,54 @@ static PyObject *vec_log(PyObject *self, PyObject *args) { return NULL; } + PyObject *eval_mode_arg = PyTuple_GetItem(args, 2); + int eval_mode = PyObject_IsTrue(eval_mode_arg); + // Iterates over logs one float at a time. Will break // horribly if Log has non-float data. PyObject *num_agents_arg = PyTuple_GetItem(args, 1); float num_agents = (float)PyLong_AsLong(num_agents_arg); - Log aggregate = {0}; int num_keys = sizeof(Log) / sizeof(float); + + // Core Idea: In train we want aggregated metrics, in eval we want them per-episode + if (eval_mode) { + if (vec->envs[0]->log.n == 0) { + return PyDict_New(); + } + + PyObject *list = PyList_New(vec->num_envs); + + for (int i = 0; i < vec->num_envs; i++) { + PyObject *dict = PyDict_New(); + Env *curr_env = vec->envs[i]; + float n = curr_env->log.n; + + for (int j = 0; j < num_keys; j++) { + ((float *)&curr_env->log)[j] /= n; + } + + curr_env->log.completion_rate = + curr_env->log.goals_reached_this_episode / curr_env->log.goals_sampled_this_episode; + + my_log(dict, &curr_env->log); + assign_to_dict(dict, "n", n); + + // For now I use the map_name like map_123.bin, in further commit I'll add the actual name + if (curr_env->map_name) { + PyObject *s = PyUnicode_FromString(curr_env->map_name); + if (s != NULL) { + PyDict_SetItemString(dict, "map_name", s); + Py_DECREF(s); + } + } + + PyList_SetItem(list, i, dict); + } + return list; + } + + Log aggregate = {0}; for (int i = 0; i < vec->num_envs; i++) { Env *env = vec->envs[i]; for (int j = 0; j < num_keys; j++) { diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 5d90685d85..8252c9aa6c 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -587,12 +587,6 @@ def train(self): ): pufferlib.utils.run_human_replay_eval_in_subprocess(self.config, self.logger, self.global_step) - # For now I add a 3rd eval function, goal is to later unify everything. - if self.config["eval"]["eval_batch"] and ( - self.epoch % self.config["eval"]["eval_interval"] == 0 or done_training - ): - pufferlib.utils.run_human_replay_eval_in_subprocess(self.config, self.logger, self.global_step) - def check_render_queue(self): """Check if any async render jobs finished and log them.""" if not self.render_async or not hasattr(self, "render_queue"): @@ -1320,6 +1314,67 @@ def eval(env_name, args=None, vecenv=None, policy=None): frames.append("Done") +def eval_womd(env_name, args=None, vecenv=None, policy=None): + args = args or load_config(env_name) + + backend = args["eval"].get("backend", "PufferEnv") + args["vec"] = dict(backend=backend, num_envs=1) + args["env"]["control_mode"] = args["eval"]["control_mode"] + args["env"]["episode_length"] = 91 # WOMD scenario length + args["env"]["resample_frequency"] = 91 + + args["env"]["eval_batch_size"] = args["eval"]["eval_batch_size"] + args["env"]["map_dir"] = args["eval"]["map_dir"] + + # For now you set num_maps in drive.ini and you will evaluate on all the num_maps + # This is specific to WOMD, when we will also include GIGAFLOW we + num_maps = args["eval"]["num_maps"] + args["env"]["num_maps"] = args["eval"]["num_maps"] + + vecenv = vecenv or load_env(env_name, args) + policy = policy or load_policy(args, vecenv, env_name) + + num_agents = vecenv.observation_space.shape[0] + device = args["train"]["device"] + + global_infos = {} + eval_batch_size = args["eval"]["eval_batch_size"] + + for batch_start_idx in range(0, num_maps, eval_batch_size): + state = {} + if args["train"]["use_rnn"]: + state = dict( + lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), + lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), + ) + + obs, _ = vecenv.reset() + + for timestep in range(args["env"]["episode_length"]): + with torch.no_grad(): + ob_tensor = torch.as_tensor(obs).to(device) + logits, value = policy.forward_eval(ob_tensor, state) + action, logprob, _ = pufferlib.pytorch.sample_logits(logits) + action_np = action.cpu().numpy().reshape(vecenv.action_space.shape) + + if isinstance(logits, torch.distributions.Normal): + action_np = np.clip(action_np, vecenv.action_space.low, vecenv.action_space.high) + + obs, rewards, dones, truncs, info_list = vecenv.step(action_np) + + if len(info_list) > 0: + for result in info_list[0]: + # Identify the binary map, and just keep the name + result["map_name"] = result["map_name"].split("/")[-1].split(".")[0] + + for k, v in result.items(): + if k not in global_infos: + global_infos[k] = [] + global_infos[k].append(v) + + print(global_infos) + + def sweep(args=None, env_name=None): args = args or load_config(env_name) if not args["wandb"] and not args["neptune"]: @@ -1763,6 +1818,8 @@ def main(): train(env_name=env_name) elif mode == "eval": eval(env_name=env_name) + elif mode == "eval_womd": + eval_womd(env_name=env_name) elif mode == "sweep": sweep(env_name=env_name) elif mode == "controlled_exp": From d5819030ada53de66c09a4c45f1202fb8b3ba69b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wa=C3=ABl=20Doulazmi?= Date: Sun, 1 Mar 2026 12:31:49 +0100 Subject: [PATCH 04/10] Multiprocessing of the eval !! Next steps: - Make some tests on the cluster - Add a nice tqdm integration - log the results to a csv --- pufferlib/ocean/drive/binding.c | 3 +- pufferlib/ocean/drive/drive.py | 14 +++++--- pufferlib/pufferl.py | 60 ++++++++++++++++++++++++--------- 3 files changed, 57 insertions(+), 20 deletions(-) diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index de65baf8ba..8ac5c57cb6 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -93,6 +93,7 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { int eval_batch_size = unpack(kwargs, "eval_batch_size"); int eval_map_counter = unpack(kwargs, "eval_map_counter"); + int eval_last_map = unpack(kwargs, "eval_last_map"); bool eval_mode = eval_batch_size > 0; float reward_bound_goal_radius_min = unpack(kwargs, "reward_bound_goal_radius_min"); @@ -136,7 +137,7 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { // The following lines should be modified when we want to do eval in GIGAFLOW if (eval_mode) { - int remaining_maps = num_maps - eval_map_counter; + int remaining_maps = eval_last_map - eval_map_counter; max_envs = eval_batch_size < remaining_maps ? eval_batch_size : remaining_maps; } diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index f6348f8605..497fb90b9f 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -49,6 +49,8 @@ def __init__( map_dir="resources/drive/binaries/training", allow_fewer_maps=True, eval_batch_size=-1, + eval_starting_map=0, + eval_num_maps_to_process=1, # reward randomization bounds reward_bound_goal_radius_min=2.0, reward_bound_goal_radius_max=12.0, @@ -245,7 +247,9 @@ def __init__( ) # This is to track on which maps the evaluation is running - self.eval_map_counter = 0 + self.eval_map_counter = eval_starting_map + self.eval_num_maps_to_process = eval_num_maps_to_process + self.eval_last_map = eval_starting_map + eval_num_maps_to_process # Iterate through all maps to count total agents that can be initialized for each map agent_offsets, map_ids, num_envs = binding.shared( map_files=self.map_files, @@ -261,6 +265,7 @@ def __init__( max_goal_distance=self.max_goal_distance, eval_batch_size=self.eval_batch_size, eval_map_counter=self.eval_map_counter, + eval_last_map=self.eval_last_map, reward_bound_goal_radius_min=self.reward_bound_goal_radius_min, reward_bound_goal_radius_max=self.reward_bound_goal_radius_max, reward_bound_collision_min=self.reward_bound_collision_min, @@ -394,9 +399,9 @@ def step(self, actions): if log: info.append(log) # print(log) - # In eval mode, I don't want to resample after the last batch (womd specific) - if self.eval_mode and self.eval_map_counter >= self.num_maps: - self.resample_frequency = 0 + # In eval mode, I don't want to resample after the last batch (womd specific) + if self.eval_mode and self.eval_map_counter >= self.eval_last_map: + self.resample_frequency = 0 if self.tick > 0 and self.resample_frequency > 0 and self.tick % self.resample_frequency == 0: self.tick = 0 binding.vec_close(self.c_envs) @@ -414,6 +419,7 @@ def step(self, actions): max_goal_distance=self.max_goal_distance, eval_batch_size=self.eval_batch_size, eval_map_counter=self.eval_map_counter, + eval_last_map=self.eval_last_map, # reward randomization bounds reward_bound_collision_min=self.reward_bound_collision_min, reward_bound_goal_radius_min=self.reward_bound_goal_radius_min, diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 8252c9aa6c..8f65a5486d 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -1315,10 +1315,10 @@ def eval(env_name, args=None, vecenv=None, policy=None): def eval_womd(env_name, args=None, vecenv=None, policy=None): + import copy + args = args or load_config(env_name) - backend = args["eval"].get("backend", "PufferEnv") - args["vec"] = dict(backend=backend, num_envs=1) args["env"]["control_mode"] = args["eval"]["control_mode"] args["env"]["episode_length"] = 91 # WOMD scenario length args["env"]["resample_frequency"] = 91 @@ -1330,17 +1330,46 @@ def eval_womd(env_name, args=None, vecenv=None, policy=None): # This is specific to WOMD, when we will also include GIGAFLOW we num_maps = args["eval"]["num_maps"] args["env"]["num_maps"] = args["eval"]["num_maps"] + args["env"]["num_agents"] = args["eval"]["num_agents"] - vecenv = vecenv or load_env(env_name, args) + # Multiprocessing logic (Distribute maps across workers) + num_workers = args["vec"]["num_workers"] + if num_workers > num_maps: + raise pufferlib.APIUsageError( + f"You requested to use {num_workers} workers for evaluating on only {num_maps} maps" + ) + env_kwargs = [] + maps_per_worker = num_maps // num_workers + remainder = num_maps % num_workers + current_start = 0 + for i in range(num_workers): + worker_kwargs = copy.deepcopy(args["env"]) + # Give one extra scenario to the first remainder workers + worker_num_maps = maps_per_worker + (1 if i < remainder else 0) + + worker_kwargs["eval_starting_map"] = current_start + worker_kwargs["eval_num_maps_to_process"] = worker_num_maps + env_kwargs.append(worker_kwargs) + + current_start += worker_num_maps + + # Instantiate each env with its own kwargs + env_module = importlib.import_module("pufferlib.ocean") + make_env = env_module.env_creator("puffer_drive") + env_creators = [make_env] * num_workers + env_args = [[]] * num_workers + + vecenv = pufferlib.vector.make(env_creators, env_args=env_args, env_kwargs=env_kwargs, **args["vec"]) policy = policy or load_policy(args, vecenv, env_name) num_agents = vecenv.observation_space.shape[0] device = args["train"]["device"] global_infos = {} - eval_batch_size = args["eval"]["eval_batch_size"] + maps_processed = 0 + obs, _ = vecenv.reset() - for batch_start_idx in range(0, num_maps, eval_batch_size): + while maps_processed < num_maps: state = {} if args["train"]["use_rnn"]: state = dict( @@ -1348,8 +1377,6 @@ def eval_womd(env_name, args=None, vecenv=None, policy=None): lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), ) - obs, _ = vecenv.reset() - for timestep in range(args["env"]["episode_length"]): with torch.no_grad(): ob_tensor = torch.as_tensor(obs).to(device) @@ -1360,19 +1387,22 @@ def eval_womd(env_name, args=None, vecenv=None, policy=None): if isinstance(logits, torch.distributions.Normal): action_np = np.clip(action_np, vecenv.action_space.low, vecenv.action_space.high) - obs, rewards, dones, truncs, info_list = vecenv.step(action_np) + obs, _, _, _, info_list = vecenv.step(action_np) if len(info_list) > 0: - for result in info_list[0]: - # Identify the binary map, and just keep the name - result["map_name"] = result["map_name"].split("/")[-1].split(".")[0] + for info_worker in info_list: + for result in info_worker: + # Identify the binary map, and just keep the name + result["map_name"] = result["map_name"].split("/")[-1].split(".")[0] - for k, v in result.items(): - if k not in global_infos: - global_infos[k] = [] - global_infos[k].append(v) + for k, v in result.items(): + if k not in global_infos: + global_infos[k] = [] + global_infos[k].append(v) + maps_processed += 1 print(global_infos) + vecenv.close() def sweep(args=None, env_name=None): From ad201a32511b2fbcf4dec9c5bb48daefe6bdf514 Mon Sep 17 00:00:00 2001 From: Wael Doulazmi Date: Sun, 1 Mar 2026 12:10:00 +0000 Subject: [PATCH 05/10] Proper timing and logging --- pufferlib/pufferl.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 8f65a5486d..28d34fb096 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -1236,7 +1236,7 @@ def eval(env_name, args=None, vecenv=None, policy=None): backend = args["eval"].get("backend", "PufferEnv") args["vec"] = dict(backend=backend, num_envs=1) - args["env"]["control_mode"] = args["eval"]["human_replay_control_mode"] + args["env"]["control_mode"] = args["eval"]["control_mode"] args["env"]["episode_length"] = 91 # WOMD scenario length vecenv = vecenv or load_env(env_name, args) @@ -1316,7 +1316,10 @@ def eval(env_name, args=None, vecenv=None, policy=None): def eval_womd(env_name, args=None, vecenv=None, policy=None): import copy + import time + import pandas as pd + t0 = time.time() args = args or load_config(env_name) args["env"]["control_mode"] = args["eval"]["control_mode"] @@ -1401,8 +1404,24 @@ def eval_womd(env_name, args=None, vecenv=None, policy=None): global_infos[k].append(v) maps_processed += 1 - print(global_infos) vecenv.close() + t1 = time.time() + print(f"It took {t1 - t0}s to evaluate {num_maps} maps in {args['env']['control_mode']} mode.") + + # Log the results and print the mean + df = pd.DataFrame(global_infos) + cols = ["map_name"] + [col for col in df.columns if col != "map_name"] + df = df[cols] + + # For now I do that, but ofc we should think at a way to manage files properly + df.to_csv("results.csv") + + # Average only the numbers + df_num = df.select_dtypes(include=["number"]) + df_num = df_num.mean() + + print("Average metrics: ") + print(df_num.to_string()) def sweep(args=None, env_name=None): From f296258aa54214b075724c12b224d0747cc3743b Mon Sep 17 00:00:00 2001 From: Wael Doulazmi Date: Sun, 1 Mar 2026 15:32:20 +0000 Subject: [PATCH 06/10] Handle the last map gracefully --- pufferlib/ocean/drive/binding.c | 13 ++++++++++--- pufferlib/ocean/drive/drive.h | 14 ++++++++++++++ pufferlib/pufferl.py | 2 ++ 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index 8ac5c57cb6..27fe0956c9 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -189,8 +189,11 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { load_map_binary(map_file_path, env); set_active_agents(env); + // In eval mode, we want all the active agents in the env (no cropped last env) + int uncomplete_last_env = eval_mode && (total_agent_count + env->active_agent_count > num_agents); + // Skip map if it doesn't contain any controllable agents - if (env->active_agent_count == 0) { + if (env->active_agent_count == 0 || uncomplete_last_env) { if (!eval_mode) { maps_checked++; @@ -231,6 +234,12 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { free(env->static_agent_indices); free(env->expert_static_agent_indices); free(env); + // In a case a map has more than num_agents the eval will break + // I don't put an assert here, but I think in a coming PR we should set MAX_AGENTS directly in python + // And we will add an assert to avoid this. + if (uncomplete_last_env) { + break; + } continue; } @@ -258,8 +267,6 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { } // printf("Generated %d environments to cover %d agents (requested %d agents)\n", env_count, total_agent_count, // num_agents); - // NOTE: even in eval we want a fixed value of num_agents now, else you cannot batch. - // I still need to think about what we do with the cropped scenario if (total_agent_count >= num_agents) { total_agent_count = num_agents; } diff --git a/pufferlib/ocean/drive/drive.h b/pufferlib/ocean/drive/drive.h index 519bf5cd98..df890b52b7 100644 --- a/pufferlib/ocean/drive/drive.h +++ b/pufferlib/ocean/drive/drive.h @@ -1481,10 +1481,24 @@ void set_active_agents(Drive *env) { env->num_agents = env->max_agents_in_sim; } + // If we have a SDC index (WOMD), initialize it first: + int sdc_index = env->sdc_track_index; + if (sdc_index >= 0) { + active_agent_indices[0] = sdc_index; + env->num_created_agents++; + env->active_agent_count++; + env->agents[sdc_index].active_agent = 1; + } + // Iterate through entities to find agents to create and/or control for (int i = 0; i < env->num_objects && env->num_created_agents < env->max_agents_in_sim; i++) { Agent *entity = &env->agents[i]; + // Skip if its the SDC + if (i == sdc_index) { + continue; + } + // Skip if not valid at initialization if (entity->log_valid[env->init_steps] != 1) { continue; diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 28d34fb096..7ebd30ad8b 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -1337,6 +1337,8 @@ def eval_womd(env_name, args=None, vecenv=None, policy=None): # Multiprocessing logic (Distribute maps across workers) num_workers = args["vec"]["num_workers"] + args["vec"]["batch_size"] = num_workers + if num_workers > num_maps: raise pufferlib.APIUsageError( f"You requested to use {num_workers} workers for evaluating on only {num_maps} maps" From 6df1be926ce70036029df26511b8cefde1db2fc3 Mon Sep 17 00:00:00 2001 From: Wael Doulazmi Date: Sun, 1 Mar 2026 15:40:52 +0000 Subject: [PATCH 07/10] Add a print --- pufferlib/pufferl.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 7ebd30ad8b..7289431acd 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -1415,6 +1415,9 @@ def eval_womd(env_name, args=None, vecenv=None, policy=None): cols = ["map_name"] + [col for col in df.columns if col != "map_name"] df = df[cols] + num_unique_maps = len(df["map_name"].unique()) + print(f"The logs contain {num_unique_maps} unique maps.") + # For now I do that, but ofc we should think at a way to manage files properly df.to_csv("results.csv") From f2c8888a3c35d6d23dfc5190dd03ba95a5d08073 Mon Sep 17 00:00:00 2001 From: Wael Doulazmi Date: Sun, 1 Mar 2026 16:25:50 +0000 Subject: [PATCH 08/10] Handle zombie envs gracefully --- pufferlib/config/ocean/drive.ini | 4 ++-- pufferlib/ocean/drive/drive.py | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index 6a1d776e78..3f9a553013 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -165,8 +165,8 @@ eval_interval = 1000 map_dir = "resources/drive/binaries/training" ; Evaluation will run on the first num_maps maps in the map_dir directory num_agents = 512 -num_maps=29 -eval_batch_size = 10 +num_maps=10000 +eval_batch_size = 128 ; Put control_sdc_only for log-replay, "control_vehicles" for self-play control_mode = "control_vehicles" diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index 497fb90b9f..96ee2dc733 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -391,6 +391,11 @@ def reset(self, seed=0): def step(self, actions): self.terminals[:] = 0 self.actions[:] = actions + + # If an eval worker already finished its job we want to skip + if self.eval_mode and self.resample_frequency == 0 and self.tick >= self.episode_length: + return (self.observations, self.rewards, self.terminals, self.truncations, []) + binding.vec_step(self.c_envs) self.tick += 1 info = [] From b8f45c901dafdf01db3c521d0b7ab128eecb6019 Mon Sep 17 00:00:00 2001 From: Wael Doulazmi Date: Mon, 2 Mar 2026 08:38:07 +0000 Subject: [PATCH 09/10] Fix the +1 in episode length --- pufferlib/ocean/drive/drive.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pufferlib/ocean/drive/drive.h b/pufferlib/ocean/drive/drive.h index 58278c6b46..a1ecd3c922 100644 --- a/pufferlib/ocean/drive/drive.h +++ b/pufferlib/ocean/drive/drive.h @@ -102,7 +102,7 @@ #define MAX_ROAD_SEGMENT_OBSERVATIONS 128 #ifndef MAX_AGENTS // Needs to be replaced with MAX_PARTNER_OBS(agents in obs_radius) throughout observations code and // with env->max_agents_in_sim throughout all agent for loops -#define MAX_AGENTS 32 +#define MAX_AGENTS 256 #endif #define STOP_AGENT 1 #define REMOVE_AGENT 2 @@ -2757,7 +2757,7 @@ void c_step(Drive *env) { break; } } - int reached_time_limit = (env->timestep + 1) >= env->episode_length; + int reached_time_limit = env->timestep >= env->episode_length; int reached_early_termination = (!originals_remaining && env->termination_mode == 1); if (reached_time_limit || reached_early_termination) { for (int i = 0; i < env->active_agent_count; i++) { From 5936f6a8580a5d20f12d595d67d276007de5ab94 Mon Sep 17 00:00:00 2001 From: Wael Doulazmi Date: Mon, 2 Mar 2026 09:36:12 +0000 Subject: [PATCH 10/10] Eval batch size was actually useless --- pufferlib/config/ocean/drive.ini | 2 +- pufferlib/ocean/drive/binding.c | 5 ++--- pufferlib/ocean/drive/drive.py | 8 ++------ pufferlib/pufferl.py | 1 - 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index 3f9a553013..444e64b5ef 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -166,7 +166,7 @@ map_dir = "resources/drive/binaries/training" ; Evaluation will run on the first num_maps maps in the map_dir directory num_agents = 512 num_maps=10000 -eval_batch_size = 128 + ; Put control_sdc_only for log-replay, "control_vehicles" for self-play control_mode = "control_vehicles" diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index 2eeae2135b..9619101210 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -91,10 +91,9 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { float min_goal_distance = unpack(kwargs, "min_goal_distance"); float max_goal_distance = unpack(kwargs, "max_goal_distance"); - int eval_batch_size = unpack(kwargs, "eval_batch_size"); int eval_map_counter = unpack(kwargs, "eval_map_counter"); int eval_last_map = unpack(kwargs, "eval_last_map"); - bool eval_mode = eval_batch_size > 0; + bool eval_mode = eval_map_counter >= 0; float reward_bound_goal_radius_min = unpack(kwargs, "reward_bound_goal_radius_min"); float reward_bound_goal_radius_max = unpack(kwargs, "reward_bound_goal_radius_max"); @@ -138,7 +137,7 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { // The following lines should be modified when we want to do eval in GIGAFLOW if (eval_mode) { int remaining_maps = eval_last_map - eval_map_counter; - max_envs = eval_batch_size < remaining_maps ? eval_batch_size : remaining_maps; + max_envs = num_agents < remaining_maps ? num_agents : remaining_maps; } int map_idx = eval_map_counter; diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index 9ea841ceec..a28b1cb3fa 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -48,8 +48,7 @@ def __init__( control_mode="control_vehicles", map_dir="resources/drive/binaries/training", allow_fewer_maps=True, - eval_batch_size=-1, - eval_starting_map=0, + eval_starting_map=-1, eval_num_maps_to_process=1, # reward randomization bounds reward_bound_goal_radius_min=2.0, @@ -111,8 +110,7 @@ def __init__( self.termination_mode = termination_mode self.resample_frequency = resample_frequency self.dynamics_model = dynamics_model - self.eval_batch_size = eval_batch_size - self.eval_mode = eval_batch_size > 0 + self.eval_mode = eval_starting_map >= 0 # reward randomization bounds self.reward_bound_goal_radius_min = reward_bound_goal_radius_min self.reward_bound_collision_min = reward_bound_collision_min @@ -263,7 +261,6 @@ def __init__( reward_conditioning=self.reward_conditioning, min_goal_distance=self.min_goal_distance, max_goal_distance=self.max_goal_distance, - eval_batch_size=self.eval_batch_size, eval_map_counter=self.eval_map_counter, eval_last_map=self.eval_last_map, reward_bound_goal_radius_min=self.reward_bound_goal_radius_min, @@ -424,7 +421,6 @@ def step(self, actions): reward_conditioning=self.reward_conditioning, min_goal_distance=self.min_goal_distance, max_goal_distance=self.max_goal_distance, - eval_batch_size=self.eval_batch_size, eval_map_counter=self.eval_map_counter, eval_last_map=self.eval_last_map, # reward randomization bounds diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 009fac38cd..8b73b36d2f 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -1335,7 +1335,6 @@ def eval_womd(env_name, args=None, vecenv=None, policy=None): args["env"]["episode_length"] = 91 # WOMD scenario length args["env"]["resample_frequency"] = 91 - args["env"]["eval_batch_size"] = args["eval"]["eval_batch_size"] args["env"]["map_dir"] = args["eval"]["map_dir"] # For now you set num_maps in drive.ini and you will evaluate on all the num_maps