diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index b9d3bac319..444e64b5ef 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -164,7 +164,13 @@ eval_interval = 1000 ; Path to dataset used for evaluation map_dir = "resources/drive/binaries/training" ; Evaluation will run on the first num_maps maps in the map_dir directory -num_maps = 20 +num_agents = 512 +num_maps=10000 + + +; Put control_sdc_only for log-replay, "control_vehicles" for self-play +control_mode = "control_vehicles" + backend = PufferEnv ; WOSAC (Waymo Open Sim Agents Challenge) evaluation settings ; If True, enables evaluation on realism metrics each time we save a checkpoint diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index 303917c869..9619101210 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -91,6 +91,10 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { float min_goal_distance = unpack(kwargs, "min_goal_distance"); float max_goal_distance = unpack(kwargs, "max_goal_distance"); + int eval_map_counter = unpack(kwargs, "eval_map_counter"); + int eval_last_map = unpack(kwargs, "eval_last_map"); + bool eval_mode = eval_map_counter >= 0; + float reward_bound_goal_radius_min = unpack(kwargs, "reward_bound_goal_radius_min"); float reward_bound_goal_radius_max = unpack(kwargs, "reward_bound_goal_radius_max"); float reward_bound_collision_min = unpack(kwargs, "reward_bound_collision_min"); @@ -124,20 +128,25 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { float reward_bound_acc_min = unpack(kwargs, "reward_bound_acc_min"); float reward_bound_acc_max = unpack(kwargs, "reward_bound_acc_max"); - int use_all_maps = unpack(kwargs, "use_all_maps"); - clock_gettime(CLOCK_REALTIME, &ts); srand(ts.tv_nsec); int total_agent_count = 0; int env_count = 0; - int max_envs = use_all_maps ? num_maps : num_agents; - int map_idx = 0; + int max_envs = num_agents; + + // The following lines should be modified when we want to do eval in GIGAFLOW + if (eval_mode) { + int remaining_maps = eval_last_map - eval_map_counter; + max_envs = num_agents < remaining_maps ? num_agents : remaining_maps; + } + + int map_idx = eval_map_counter; int maps_checked = 0; PyObject *agent_offsets = PyList_New(max_envs + 1); PyObject *map_ids = PyList_New(max_envs); // getting env count - while (use_all_maps ? map_idx < max_envs : total_agent_count < num_agents && env_count < max_envs) { - int map_id = use_all_maps ? map_idx++ : rand() % num_maps; + while (total_agent_count < num_agents && env_count < max_envs) { + int map_id = (eval_mode ? map_idx++ : rand()) % num_maps; Drive *env = calloc(1, sizeof(Drive)); env->init_mode = init_mode; env->control_mode = control_mode; @@ -179,9 +188,12 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { load_map_binary(map_file_path, env); set_active_agents(env); + // In eval mode, we want all the active agents in the env (no cropped last env) + int uncomplete_last_env = eval_mode && (total_agent_count + env->active_agent_count > num_agents); + // Skip map if it doesn't contain any controllable agents - if (env->active_agent_count == 0) { - if (!use_all_maps) { + if (env->active_agent_count == 0 || uncomplete_last_env) { + if (!eval_mode) { maps_checked++; // Safeguard: if we've checked all available maps and found no active agents, raise an error @@ -221,6 +233,12 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { free(env->static_agent_indices); free(env->expert_static_agent_indices); free(env); + // In a case a map has more than num_agents the eval will break + // I don't put an assert here, but I think in a coming PR we should set MAX_AGENTS directly in python + // And we will add an assert to avoid this. + if (uncomplete_last_env) { + break; + } continue; } @@ -248,7 +266,7 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { } // printf("Generated %d environments to cover %d agents (requested %d agents)\n", env_count, total_agent_count, // num_agents); - if (!use_all_maps && total_agent_count >= num_agents) { + if (total_agent_count >= num_agents) { total_agent_count = num_agents; } PyObject *final_total_agent_count = PyLong_FromLong(total_agent_count); diff --git a/pufferlib/ocean/drive/drive.h b/pufferlib/ocean/drive/drive.h index 98b3ef1edd..a1ecd3c922 100644 --- a/pufferlib/ocean/drive/drive.h +++ b/pufferlib/ocean/drive/drive.h @@ -102,7 +102,7 @@ #define MAX_ROAD_SEGMENT_OBSERVATIONS 128 #ifndef MAX_AGENTS // Needs to be replaced with MAX_PARTNER_OBS(agents in obs_radius) throughout observations code and // with env->max_agents_in_sim throughout all agent for loops -#define MAX_AGENTS 32 +#define MAX_AGENTS 256 #endif #define STOP_AGENT 1 #define REMOVE_AGENT 2 @@ -1496,10 +1496,24 @@ void set_active_agents(Drive *env) { env->num_agents = env->max_agents_in_sim; } + // If we have a SDC index (WOMD), initialize it first: + int sdc_index = env->sdc_track_index; + if (sdc_index >= 0) { + active_agent_indices[0] = sdc_index; + env->num_created_agents++; + env->active_agent_count++; + env->agents[sdc_index].active_agent = 1; + } + // Iterate through entities to find agents to create and/or control for (int i = 0; i < env->num_objects && env->num_created_agents < env->max_agents_in_sim; i++) { Agent *entity = &env->agents[i]; + // Skip if its the SDC + if (i == sdc_index) { + continue; + } + // Skip if not valid at initialization if (entity->log_valid[env->init_steps] != 1) { continue; @@ -2743,7 +2757,7 @@ void c_step(Drive *env) { break; } } - int reached_time_limit = (env->timestep + 1) >= env->episode_length; + int reached_time_limit = env->timestep >= env->episode_length; int reached_early_termination = (!originals_remaining && env->termination_mode == 1); if (reached_time_limit || reached_early_termination) { for (int i = 0; i < env->active_agent_count; i++) { diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index c4bd00cf84..a28b1cb3fa 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -47,8 +47,9 @@ def __init__( init_mode="create_all_valid", control_mode="control_vehicles", map_dir="resources/drive/binaries/training", - use_all_maps=False, allow_fewer_maps=True, + eval_starting_map=-1, + eval_num_maps_to_process=1, # reward randomization bounds reward_bound_goal_radius_min=2.0, reward_bound_goal_radius_max=12.0, @@ -109,6 +110,7 @@ def __init__( self.termination_mode = termination_mode self.resample_frequency = resample_frequency self.dynamics_model = dynamics_model + self.eval_mode = eval_starting_map >= 0 # reward randomization bounds self.reward_bound_goal_radius_min = reward_bound_goal_radius_min self.reward_bound_collision_min = reward_bound_collision_min @@ -242,6 +244,10 @@ def __init__( f"Please reduce num_maps, add more maps to {map_dir}, or set allow_fewer_maps=True." ) + # This is to track on which maps the evaluation is running + self.eval_map_counter = eval_starting_map + self.eval_num_maps_to_process = eval_num_maps_to_process + self.eval_last_map = eval_starting_map + eval_num_maps_to_process # Iterate through all maps to count total agents that can be initialized for each map agent_offsets, map_ids, num_envs = binding.shared( map_files=self.map_files, @@ -255,6 +261,8 @@ def __init__( reward_conditioning=self.reward_conditioning, min_goal_distance=self.min_goal_distance, max_goal_distance=self.max_goal_distance, + eval_map_counter=self.eval_map_counter, + eval_last_map=self.eval_last_map, reward_bound_goal_radius_min=self.reward_bound_goal_radius_min, reward_bound_goal_radius_max=self.reward_bound_goal_radius_max, reward_bound_collision_min=self.reward_bound_collision_min, @@ -287,14 +295,14 @@ def __init__( reward_bound_steer_max=self.reward_bound_steer_max, reward_bound_acc_min=self.reward_bound_acc_min, reward_bound_acc_max=self.reward_bound_acc_max, - use_all_maps=use_all_maps, ) - # agent_offsets[-1] works in both cases, just making it explicit that num_agents is ignored if use_all_maps - self.num_agents = num_agents if not use_all_maps else agent_offsets[-1] + self.num_agents = num_agents self.agent_offsets = agent_offsets self.map_ids = map_ids self.num_envs = num_envs + # NOTE: What will happen if the map_counter becomes higher than num_maps ? + self.eval_map_counter += num_envs super().__init__(buf=buf) env_ids = [] for i in range(num_envs): @@ -382,14 +390,22 @@ def step(self, actions): self.terminals[:] = 0 self.truncations[:] = 0 self.actions[:] = actions + + # If an eval worker already finished its job we want to skip + if self.eval_mode and self.resample_frequency == 0 and self.tick >= self.episode_length: + return (self.observations, self.rewards, self.terminals, self.truncations, []) + binding.vec_step(self.c_envs) self.tick += 1 info = [] if self.tick % self.report_interval == 0: - log = binding.vec_log(self.c_envs, self.num_agents) + log = binding.vec_log(self.c_envs, self.num_agents, self.eval_mode) if log: info.append(log) # print(log) + # In eval mode, I don't want to resample after the last batch (womd specific) + if self.eval_mode and self.eval_map_counter >= self.eval_last_map: + self.resample_frequency = 0 if self.tick > 0 and self.resample_frequency > 0 and self.tick % self.resample_frequency == 0: self.tick = 0 binding.vec_close(self.c_envs) @@ -405,6 +421,8 @@ def step(self, actions): reward_conditioning=self.reward_conditioning, min_goal_distance=self.min_goal_distance, max_goal_distance=self.max_goal_distance, + eval_map_counter=self.eval_map_counter, + eval_last_map=self.eval_last_map, # reward randomization bounds reward_bound_collision_min=self.reward_bound_collision_min, reward_bound_goal_radius_min=self.reward_bound_goal_radius_min, @@ -438,11 +456,11 @@ def step(self, actions): reward_bound_steer_max=self.reward_bound_steer_max, reward_bound_acc_min=self.reward_bound_acc_min, reward_bound_acc_max=self.reward_bound_acc_max, - use_all_maps=False, ) self.agent_offsets = agent_offsets self.map_ids = map_ids self.num_envs = num_envs + self.eval_map_counter += num_envs env_ids = [] seed = np.random.randint(0, 2**32 - 1) for i in range(num_envs): diff --git a/pufferlib/ocean/env_binding.h b/pufferlib/ocean/env_binding.h index 6a411492bc..d3cb3d2927 100644 --- a/pufferlib/ocean/env_binding.h +++ b/pufferlib/ocean/env_binding.h @@ -556,8 +556,8 @@ static int assign_to_dict(PyObject *dict, char *key, float value) { } static PyObject *vec_log(PyObject *self, PyObject *args) { - if (PyTuple_Size(args) != 2) { - PyErr_SetString(PyExc_TypeError, "vec_log requires 2 arguments"); + if (PyTuple_Size(args) != 3) { + PyErr_SetString(PyExc_TypeError, "vec_log requires 3 arguments"); return NULL; } @@ -566,13 +566,54 @@ static PyObject *vec_log(PyObject *self, PyObject *args) { return NULL; } + PyObject *eval_mode_arg = PyTuple_GetItem(args, 2); + int eval_mode = PyObject_IsTrue(eval_mode_arg); + // Iterates over logs one float at a time. Will break // horribly if Log has non-float data. PyObject *num_agents_arg = PyTuple_GetItem(args, 1); float num_agents = (float)PyLong_AsLong(num_agents_arg); - Log aggregate = {0}; int num_keys = sizeof(Log) / sizeof(float); + + // Core Idea: In train we want aggregated metrics, in eval we want them per-episode + if (eval_mode) { + if (vec->envs[0]->log.n == 0) { + return PyDict_New(); + } + + PyObject *list = PyList_New(vec->num_envs); + + for (int i = 0; i < vec->num_envs; i++) { + PyObject *dict = PyDict_New(); + Env *curr_env = vec->envs[i]; + float n = curr_env->log.n; + + for (int j = 0; j < num_keys; j++) { + ((float *)&curr_env->log)[j] /= n; + } + + curr_env->log.completion_rate = + curr_env->log.goals_reached_this_episode / curr_env->log.goals_sampled_this_episode; + + my_log(dict, &curr_env->log); + assign_to_dict(dict, "n", n); + + // For now I use the map_name like map_123.bin, in further commit I'll add the actual name + if (curr_env->map_name) { + PyObject *s = PyUnicode_FromString(curr_env->map_name); + if (s != NULL) { + PyDict_SetItemString(dict, "map_name", s); + Py_DECREF(s); + } + } + + PyList_SetItem(list, i, dict); + } + return list; + } + + Log aggregate = {0}; for (int i = 0; i < vec->num_envs; i++) { Env *env = vec->envs[i]; for (int j = 0; j < num_keys; j++) { diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 49486037d2..8b73b36d2f 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -1245,7 +1245,7 @@ def eval(env_name, args=None, vecenv=None, policy=None): backend = args["eval"].get("backend", "PufferEnv") args["vec"] = dict(backend=backend, num_envs=1) - args["env"]["control_mode"] = args["eval"]["human_replay_control_mode"] + args["env"]["control_mode"] = args["eval"]["control_mode"] args["env"]["episode_length"] = 91 # WOMD scenario length vecenv = vecenv or load_env(env_name, args) @@ -1323,6 +1323,120 @@ def eval(env_name, args=None, vecenv=None, policy=None): frames.append("Done") +def eval_womd(env_name, args=None, vecenv=None, policy=None): + import copy + import time + import pandas as pd + + t0 = time.time() + args = args or load_config(env_name) + + args["env"]["control_mode"] = args["eval"]["control_mode"] + args["env"]["episode_length"] = 91 # WOMD scenario length + args["env"]["resample_frequency"] = 91 + + args["env"]["map_dir"] = args["eval"]["map_dir"] + + # For now you set num_maps in drive.ini and you will evaluate on all the num_maps + # This is specific to WOMD, when we will also include GIGAFLOW we + num_maps = args["eval"]["num_maps"] + args["env"]["num_maps"] = args["eval"]["num_maps"] + args["env"]["num_agents"] = args["eval"]["num_agents"] + + # Multiprocessing logic (Distribute maps across workers) + num_workers = args["vec"]["num_workers"] + args["vec"]["batch_size"] = num_workers + + if num_workers > num_maps: + raise pufferlib.APIUsageError( + f"You requested to use {num_workers} workers for evaluating on only {num_maps} maps" + ) + env_kwargs = [] + maps_per_worker = num_maps // num_workers + remainder = num_maps % num_workers + current_start = 0 + for i in range(num_workers): + worker_kwargs = copy.deepcopy(args["env"]) + # Give one extra scenario to the first remainder workers + worker_num_maps = maps_per_worker + (1 if i < remainder else 0) + + worker_kwargs["eval_starting_map"] = current_start + worker_kwargs["eval_num_maps_to_process"] = worker_num_maps + env_kwargs.append(worker_kwargs) + + current_start += worker_num_maps + + # Instantiate each env with its own kwargs + env_module = importlib.import_module("pufferlib.ocean") + make_env = env_module.env_creator("puffer_drive") + env_creators = [make_env] * num_workers + env_args = [[]] * num_workers + + vecenv = pufferlib.vector.make(env_creators, env_args=env_args, env_kwargs=env_kwargs, **args["vec"]) + policy = policy or load_policy(args, vecenv, env_name) + + num_agents = vecenv.observation_space.shape[0] + device = args["train"]["device"] + + global_infos = {} + maps_processed = 0 + obs, _ = vecenv.reset() + + while maps_processed < num_maps: + state = {} + if args["train"]["use_rnn"]: + state = dict( + lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), + lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), + ) + + for timestep in range(args["env"]["episode_length"]): + with torch.no_grad(): + ob_tensor = torch.as_tensor(obs).to(device) + logits, value = policy.forward_eval(ob_tensor, state) + action, logprob, _ = pufferlib.pytorch.sample_logits(logits) + action_np = action.cpu().numpy().reshape(vecenv.action_space.shape) + + if isinstance(logits, torch.distributions.Normal): + action_np = np.clip(action_np, vecenv.action_space.low, vecenv.action_space.high) + + obs, _, _, _, info_list = vecenv.step(action_np) + + if len(info_list) > 0: + for info_worker in info_list: + for result in info_worker: + # Identify the binary map, and just keep the name + result["map_name"] = result["map_name"].split("/")[-1].split(".")[0] + + for k, v in result.items(): + if k not in global_infos: + global_infos[k] = [] + global_infos[k].append(v) + maps_processed += 1 + + vecenv.close() + t1 = time.time() + print(f"It took {t1 - t0}s to evaluate {num_maps} maps in {args['env']['control_mode']} mode.") + + # Log the results and print the mean + df = pd.DataFrame(global_infos) + cols = ["map_name"] + [col for col in df.columns if col != "map_name"] + df = df[cols] + + num_unique_maps = len(df["map_name"].unique()) + print(f"The logs contain {num_unique_maps} unique maps.") + + # For now I do that, but ofc we should think at a way to manage files properly + df.to_csv("results.csv") + + # Average only the numbers + df_num = df.select_dtypes(include=["number"]) + df_num = df_num.mean() + + print("Average metrics: ") + print(df_num.to_string()) + + def sweep(args=None, env_name=None): args = args or load_config(env_name) if not args["wandb"] and not args["neptune"]: @@ -1766,6 +1880,8 @@ def main(): train(env_name=env_name) elif mode == "eval": eval(env_name=env_name) + elif mode == "eval_womd": + eval_womd(env_name=env_name) elif mode == "sweep": sweep(env_name=env_name) elif mode == "controlled_exp":