Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pufferlib/config/ocean/drive.ini
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,13 @@ eval_interval = 1000
; Path to dataset used for evaluation
map_dir = "resources/drive/binaries/training"
; Evaluation will run on the first num_maps maps in the map_dir directory
num_maps = 20
num_agents = 512
num_maps=10000


; Put control_sdc_only for log-replay, "control_vehicles" for self-play
control_mode = "control_vehicles"

backend = PufferEnv
; WOSAC (Waymo Open Sim Agents Challenge) evaluation settings
; If True, enables evaluation on realism metrics each time we save a checkpoint
Expand Down
36 changes: 27 additions & 9 deletions pufferlib/ocean/drive/binding.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) {
float min_goal_distance = unpack(kwargs, "min_goal_distance");
float max_goal_distance = unpack(kwargs, "max_goal_distance");

int eval_map_counter = unpack(kwargs, "eval_map_counter");
int eval_last_map = unpack(kwargs, "eval_last_map");
bool eval_mode = eval_map_counter >= 0;

float reward_bound_goal_radius_min = unpack(kwargs, "reward_bound_goal_radius_min");
float reward_bound_goal_radius_max = unpack(kwargs, "reward_bound_goal_radius_max");
float reward_bound_collision_min = unpack(kwargs, "reward_bound_collision_min");
Expand Down Expand Up @@ -124,20 +128,25 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) {
float reward_bound_acc_min = unpack(kwargs, "reward_bound_acc_min");
float reward_bound_acc_max = unpack(kwargs, "reward_bound_acc_max");

int use_all_maps = unpack(kwargs, "use_all_maps");

clock_gettime(CLOCK_REALTIME, &ts);
srand(ts.tv_nsec);
int total_agent_count = 0;
int env_count = 0;
int max_envs = use_all_maps ? num_maps : num_agents;
int map_idx = 0;
int max_envs = num_agents;

// The following lines should be modified when we want to do eval in GIGAFLOW
if (eval_mode) {
int remaining_maps = eval_last_map - eval_map_counter;
max_envs = num_agents < remaining_maps ? num_agents : remaining_maps;
}

int map_idx = eval_map_counter;
int maps_checked = 0;
PyObject *agent_offsets = PyList_New(max_envs + 1);
PyObject *map_ids = PyList_New(max_envs);
// getting env count
while (use_all_maps ? map_idx < max_envs : total_agent_count < num_agents && env_count < max_envs) {
int map_id = use_all_maps ? map_idx++ : rand() % num_maps;
while (total_agent_count < num_agents && env_count < max_envs) {
int map_id = (eval_mode ? map_idx++ : rand()) % num_maps;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modulo operation causes map index wrapping, potentially reprocessing maps

when map_idx exceeds num_maps, the modulo wraps it back to 0, which could cause the same maps to be evaluated multiple times if fewer than expected environments are created in earlier batches

if a worker is assigned maps 0-99 but only creates 95 envs in the first batch, it will start at map 95 next time, then wrap to maps 0-89, causing duplication

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It prevents the code from crashing if a map is skipped, but it will also be useful in the CARLA setting where we want to sample the same 5 maps multiple times.

Once again no maps should be skipped if we initialize the SDC first

Drive *env = calloc(1, sizeof(Drive));
env->init_mode = init_mode;
env->control_mode = control_mode;
Expand Down Expand Up @@ -179,9 +188,12 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) {
load_map_binary(map_file_path, env);
set_active_agents(env);

// In eval mode, we want all the active agents in the env (no cropped last env)
int uncomplete_last_env = eval_mode && (total_agent_count + env->active_agent_count > num_agents);

// Skip map if it doesn't contain any controllable agents
if (env->active_agent_count == 0) {
if (!use_all_maps) {
if (env->active_agent_count == 0 || uncomplete_last_env) {
if (!eval_mode) {
maps_checked++;

// Safeguard: if we've checked all available maps and found no active agents, raise an error
Expand Down Expand Up @@ -221,6 +233,12 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) {
free(env->static_agent_indices);
free(env->expert_static_agent_indices);
free(env);
// In a case a map has more than num_agents the eval will break
// I don't put an assert here, but I think in a coming PR we should set MAX_AGENTS directly in python
// And we will add an assert to avoid this.
if (uncomplete_last_env) {
break;
}
Comment on lines +236 to +241
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unhandled edge case: map with more agents than num_agents will break eval

the comment acknowledges this but doesn't add error handling - consider validating or asserting that no single map exceeds num_agents

continue;
}

Expand Down Expand Up @@ -248,7 +266,7 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) {
}
// printf("Generated %d environments to cover %d agents (requested %d agents)\n", env_count, total_agent_count,
// num_agents);
if (!use_all_maps && total_agent_count >= num_agents) {
if (total_agent_count >= num_agents) {
total_agent_count = num_agents;
}
PyObject *final_total_agent_count = PyLong_FromLong(total_agent_count);
Expand Down
18 changes: 16 additions & 2 deletions pufferlib/ocean/drive/drive.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
#define MAX_ROAD_SEGMENT_OBSERVATIONS 128
#ifndef MAX_AGENTS // Needs to be replaced with MAX_PARTNER_OBS(agents in obs_radius) throughout observations code and
// with env->max_agents_in_sim throughout all agent for loops
#define MAX_AGENTS 32
#define MAX_AGENTS 256
#endif
#define STOP_AGENT 1
#define REMOVE_AGENT 2
Expand Down Expand Up @@ -1496,10 +1496,24 @@ void set_active_agents(Drive *env) {
env->num_agents = env->max_agents_in_sim;
}

// If we have a SDC index (WOMD), initialize it first:
int sdc_index = env->sdc_track_index;
if (sdc_index >= 0) {
active_agent_indices[0] = sdc_index;
env->num_created_agents++;
env->active_agent_count++;
env->agents[sdc_index].active_agent = 1;
}
Comment on lines +1499 to +1506
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialized SDC (self-driving car) as first active agent to ensure consistent agent ordering for WOMD evaluation


// Iterate through entities to find agents to create and/or control
for (int i = 0; i < env->num_objects && env->num_created_agents < env->max_agents_in_sim; i++) {
Agent *entity = &env->agents[i];

// Skip if its the SDC
if (i == sdc_index) {
continue;
}

// Skip if not valid at initialization
if (entity->log_valid[env->init_steps] != 1) {
continue;
Expand Down Expand Up @@ -2743,7 +2757,7 @@ void c_step(Drive *env) {
break;
}
}
int reached_time_limit = (env->timestep + 1) >= env->episode_length;
int reached_time_limit = env->timestep >= env->episode_length;
int reached_early_termination = (!originals_remaining && env->termination_mode == 1);
if (reached_time_limit || reached_early_termination) {
for (int i = 0; i < env->active_agent_count; i++) {
Expand Down
30 changes: 24 additions & 6 deletions pufferlib/ocean/drive/drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ def __init__(
init_mode="create_all_valid",
control_mode="control_vehicles",
map_dir="resources/drive/binaries/training",
use_all_maps=False,
allow_fewer_maps=True,
eval_starting_map=-1,
eval_num_maps_to_process=1,
# reward randomization bounds
reward_bound_goal_radius_min=2.0,
reward_bound_goal_radius_max=12.0,
Expand Down Expand Up @@ -109,6 +110,7 @@ def __init__(
self.termination_mode = termination_mode
self.resample_frequency = resample_frequency
self.dynamics_model = dynamics_model
self.eval_mode = eval_starting_map >= 0
# reward randomization bounds
self.reward_bound_goal_radius_min = reward_bound_goal_radius_min
self.reward_bound_collision_min = reward_bound_collision_min
Expand Down Expand Up @@ -242,6 +244,10 @@ def __init__(
f"Please reduce num_maps, add more maps to {map_dir}, or set allow_fewer_maps=True."
)

# This is to track on which maps the evaluation is running
self.eval_map_counter = eval_starting_map
self.eval_num_maps_to_process = eval_num_maps_to_process
self.eval_last_map = eval_starting_map + eval_num_maps_to_process
# Iterate through all maps to count total agents that can be initialized for each map
agent_offsets, map_ids, num_envs = binding.shared(
map_files=self.map_files,
Expand All @@ -255,6 +261,8 @@ def __init__(
reward_conditioning=self.reward_conditioning,
min_goal_distance=self.min_goal_distance,
max_goal_distance=self.max_goal_distance,
eval_map_counter=self.eval_map_counter,
eval_last_map=self.eval_last_map,
reward_bound_goal_radius_min=self.reward_bound_goal_radius_min,
reward_bound_goal_radius_max=self.reward_bound_goal_radius_max,
reward_bound_collision_min=self.reward_bound_collision_min,
Expand Down Expand Up @@ -287,14 +295,14 @@ def __init__(
reward_bound_steer_max=self.reward_bound_steer_max,
reward_bound_acc_min=self.reward_bound_acc_min,
reward_bound_acc_max=self.reward_bound_acc_max,
use_all_maps=use_all_maps,
)

# agent_offsets[-1] works in both cases, just making it explicit that num_agents is ignored if use_all_maps
self.num_agents = num_agents if not use_all_maps else agent_offsets[-1]
self.num_agents = num_agents
self.agent_offsets = agent_offsets
self.map_ids = map_ids
self.num_envs = num_envs
# NOTE: What will happen if the map_counter becomes higher than num_maps ?
self.eval_map_counter += num_envs
super().__init__(buf=buf)
env_ids = []
for i in range(num_envs):
Expand Down Expand Up @@ -382,14 +390,22 @@ def step(self, actions):
self.terminals[:] = 0
self.truncations[:] = 0
self.actions[:] = actions

# If an eval worker already finished its job we want to skip
if self.eval_mode and self.resample_frequency == 0 and self.tick >= self.episode_length:
return (self.observations, self.rewards, self.terminals, self.truncations, [])
Comment on lines +395 to +396
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skipped further processing when eval worker completes all assigned maps, preventing unnecessary computation


binding.vec_step(self.c_envs)
self.tick += 1
info = []
if self.tick % self.report_interval == 0:
log = binding.vec_log(self.c_envs, self.num_agents)
log = binding.vec_log(self.c_envs, self.num_agents, self.eval_mode)
if log:
info.append(log)
# print(log)
# In eval mode, I don't want to resample after the last batch (womd specific)
if self.eval_mode and self.eval_map_counter >= self.eval_last_map:
self.resample_frequency = 0
if self.tick > 0 and self.resample_frequency > 0 and self.tick % self.resample_frequency == 0:
self.tick = 0
binding.vec_close(self.c_envs)
Expand All @@ -405,6 +421,8 @@ def step(self, actions):
reward_conditioning=self.reward_conditioning,
min_goal_distance=self.min_goal_distance,
max_goal_distance=self.max_goal_distance,
eval_map_counter=self.eval_map_counter,
eval_last_map=self.eval_last_map,
# reward randomization bounds
reward_bound_collision_min=self.reward_bound_collision_min,
reward_bound_goal_radius_min=self.reward_bound_goal_radius_min,
Expand Down Expand Up @@ -438,11 +456,11 @@ def step(self, actions):
reward_bound_steer_max=self.reward_bound_steer_max,
reward_bound_acc_min=self.reward_bound_acc_min,
reward_bound_acc_max=self.reward_bound_acc_max,
use_all_maps=False,
)
self.agent_offsets = agent_offsets
self.map_ids = map_ids
self.num_envs = num_envs
self.eval_map_counter += num_envs
env_ids = []
seed = np.random.randint(0, 2**32 - 1)
for i in range(num_envs):
Expand Down
47 changes: 44 additions & 3 deletions pufferlib/ocean/env_binding.h
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,8 @@ static int assign_to_dict(PyObject *dict, char *key, float value) {
}

static PyObject *vec_log(PyObject *self, PyObject *args) {
if (PyTuple_Size(args) != 2) {
PyErr_SetString(PyExc_TypeError, "vec_log requires 2 arguments");
if (PyTuple_Size(args) != 3) {
PyErr_SetString(PyExc_TypeError, "vec_log requires 3 arguments");
return NULL;
}

Expand All @@ -566,13 +566,54 @@ static PyObject *vec_log(PyObject *self, PyObject *args) {
return NULL;
}

PyObject *eval_mode_arg = PyTuple_GetItem(args, 2);
int eval_mode = PyObject_IsTrue(eval_mode_arg);

// Iterates over logs one float at a time. Will break
// horribly if Log has non-float data.
PyObject *num_agents_arg = PyTuple_GetItem(args, 1);
float num_agents = (float)PyLong_AsLong(num_agents_arg);

Log aggregate = {0};
int num_keys = sizeof(Log) / sizeof(float);

// Core Idea: In train we want aggregated metrics, in eval we want them per-episode
if (eval_mode) {
if (vec->envs[0]->log.n == 0) {
return PyDict_New();
}

PyObject *list = PyList_New(vec->num_envs);

for (int i = 0; i < vec->num_envs; i++) {
PyObject *dict = PyDict_New();
Env *curr_env = vec->envs[i];
float n = curr_env->log.n;

for (int j = 0; j < num_keys; j++) {
((float *)&curr_env->log)[j] /= n;
}

curr_env->log.completion_rate =
curr_env->log.goals_reached_this_episode / curr_env->log.goals_sampled_this_episode;

my_log(dict, &curr_env->log);
assign_to_dict(dict, "n", n);

// For now I use the map_name like map_123.bin, in further commit I'll add the actual name
if (curr_env->map_name) {
PyObject *s = PyUnicode_FromString(curr_env->map_name);
if (s != NULL) {
PyDict_SetItemString(dict, "map_name", s);
Py_DECREF(s);
}
}

PyList_SetItem(list, i, dict);
}
return list;
}

Log aggregate = {0};
for (int i = 0; i < vec->num_envs; i++) {
Env *env = vec->envs[i];
for (int j = 0; j < num_keys; j++) {
Expand Down
Loading