-
Notifications
You must be signed in to change notification settings - Fork 21
Multiprocessed EVAL #307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 3.0_beta
Are you sure you want to change the base?
Multiprocessed EVAL #307
Changes from all commits
ced67e2
77898d1
7ee7429
e3a2fd6
3109747
d581903
ad201a3
f296258
6df1be9
f2c8888
b1e9707
b8f45c9
5936f6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -91,6 +91,10 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { | |
| float min_goal_distance = unpack(kwargs, "min_goal_distance"); | ||
| float max_goal_distance = unpack(kwargs, "max_goal_distance"); | ||
|
|
||
| int eval_map_counter = unpack(kwargs, "eval_map_counter"); | ||
| int eval_last_map = unpack(kwargs, "eval_last_map"); | ||
| bool eval_mode = eval_map_counter >= 0; | ||
|
|
||
| float reward_bound_goal_radius_min = unpack(kwargs, "reward_bound_goal_radius_min"); | ||
| float reward_bound_goal_radius_max = unpack(kwargs, "reward_bound_goal_radius_max"); | ||
| float reward_bound_collision_min = unpack(kwargs, "reward_bound_collision_min"); | ||
|
|
@@ -124,20 +128,25 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { | |
| float reward_bound_acc_min = unpack(kwargs, "reward_bound_acc_min"); | ||
| float reward_bound_acc_max = unpack(kwargs, "reward_bound_acc_max"); | ||
|
|
||
| int use_all_maps = unpack(kwargs, "use_all_maps"); | ||
|
|
||
| clock_gettime(CLOCK_REALTIME, &ts); | ||
| srand(ts.tv_nsec); | ||
| int total_agent_count = 0; | ||
| int env_count = 0; | ||
| int max_envs = use_all_maps ? num_maps : num_agents; | ||
| int map_idx = 0; | ||
| int max_envs = num_agents; | ||
|
|
||
| // The following lines should be modified when we want to do eval in GIGAFLOW | ||
| if (eval_mode) { | ||
| int remaining_maps = eval_last_map - eval_map_counter; | ||
| max_envs = num_agents < remaining_maps ? num_agents : remaining_maps; | ||
| } | ||
|
|
||
| int map_idx = eval_map_counter; | ||
| int maps_checked = 0; | ||
| PyObject *agent_offsets = PyList_New(max_envs + 1); | ||
| PyObject *map_ids = PyList_New(max_envs); | ||
| // getting env count | ||
| while (use_all_maps ? map_idx < max_envs : total_agent_count < num_agents && env_count < max_envs) { | ||
| int map_id = use_all_maps ? map_idx++ : rand() % num_maps; | ||
| while (total_agent_count < num_agents && env_count < max_envs) { | ||
| int map_id = (eval_mode ? map_idx++ : rand()) % num_maps; | ||
| Drive *env = calloc(1, sizeof(Drive)); | ||
| env->init_mode = init_mode; | ||
| env->control_mode = control_mode; | ||
|
|
@@ -179,9 +188,12 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { | |
| load_map_binary(map_file_path, env); | ||
| set_active_agents(env); | ||
|
|
||
| // In eval mode, we want all the active agents in the env (no cropped last env) | ||
| int uncomplete_last_env = eval_mode && (total_agent_count + env->active_agent_count > num_agents); | ||
|
|
||
| // Skip map if it doesn't contain any controllable agents | ||
| if (env->active_agent_count == 0) { | ||
| if (!use_all_maps) { | ||
| if (env->active_agent_count == 0 || uncomplete_last_env) { | ||
| if (!eval_mode) { | ||
| maps_checked++; | ||
|
|
||
| // Safeguard: if we've checked all available maps and found no active agents, raise an error | ||
|
|
@@ -221,6 +233,12 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { | |
| free(env->static_agent_indices); | ||
| free(env->expert_static_agent_indices); | ||
| free(env); | ||
| // In a case a map has more than num_agents the eval will break | ||
| // I don't put an assert here, but I think in a coming PR we should set MAX_AGENTS directly in python | ||
| // And we will add an assert to avoid this. | ||
| if (uncomplete_last_env) { | ||
| break; | ||
| } | ||
|
Comment on lines
+236
to
+241
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unhandled edge case: map with more agents than the comment acknowledges this but doesn't add error handling - consider validating or asserting that no single map exceeds |
||
| continue; | ||
| } | ||
|
|
||
|
|
@@ -248,7 +266,7 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { | |
| } | ||
| // printf("Generated %d environments to cover %d agents (requested %d agents)\n", env_count, total_agent_count, | ||
| // num_agents); | ||
| if (!use_all_maps && total_agent_count >= num_agents) { | ||
| if (total_agent_count >= num_agents) { | ||
| total_agent_count = num_agents; | ||
| } | ||
| PyObject *final_total_agent_count = PyLong_FromLong(total_agent_count); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -102,7 +102,7 @@ | |
| #define MAX_ROAD_SEGMENT_OBSERVATIONS 128 | ||
| #ifndef MAX_AGENTS // Needs to be replaced with MAX_PARTNER_OBS(agents in obs_radius) throughout observations code and | ||
| // with env->max_agents_in_sim throughout all agent for loops | ||
| #define MAX_AGENTS 32 | ||
| #define MAX_AGENTS 256 | ||
| #endif | ||
| #define STOP_AGENT 1 | ||
| #define REMOVE_AGENT 2 | ||
|
|
@@ -1496,10 +1496,24 @@ void set_active_agents(Drive *env) { | |
| env->num_agents = env->max_agents_in_sim; | ||
| } | ||
|
|
||
| // If we have a SDC index (WOMD), initialize it first: | ||
| int sdc_index = env->sdc_track_index; | ||
| if (sdc_index >= 0) { | ||
| active_agent_indices[0] = sdc_index; | ||
| env->num_created_agents++; | ||
| env->active_agent_count++; | ||
| env->agents[sdc_index].active_agent = 1; | ||
| } | ||
|
Comment on lines
+1499
to
+1506
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. initialized SDC (self-driving car) as first active agent to ensure consistent agent ordering for WOMD evaluation |
||
|
|
||
| // Iterate through entities to find agents to create and/or control | ||
| for (int i = 0; i < env->num_objects && env->num_created_agents < env->max_agents_in_sim; i++) { | ||
| Agent *entity = &env->agents[i]; | ||
|
|
||
| // Skip if its the SDC | ||
| if (i == sdc_index) { | ||
| continue; | ||
| } | ||
|
|
||
| // Skip if not valid at initialization | ||
| if (entity->log_valid[env->init_steps] != 1) { | ||
| continue; | ||
|
|
@@ -2743,7 +2757,7 @@ void c_step(Drive *env) { | |
| break; | ||
| } | ||
| } | ||
| int reached_time_limit = (env->timestep + 1) >= env->episode_length; | ||
| int reached_time_limit = env->timestep >= env->episode_length; | ||
| int reached_early_termination = (!originals_remaining && env->termination_mode == 1); | ||
| if (reached_time_limit || reached_early_termination) { | ||
| for (int i = 0; i < env->active_agent_count; i++) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,8 +47,9 @@ def __init__( | |
| init_mode="create_all_valid", | ||
| control_mode="control_vehicles", | ||
| map_dir="resources/drive/binaries/training", | ||
| use_all_maps=False, | ||
| allow_fewer_maps=True, | ||
| eval_starting_map=-1, | ||
| eval_num_maps_to_process=1, | ||
| # reward randomization bounds | ||
| reward_bound_goal_radius_min=2.0, | ||
| reward_bound_goal_radius_max=12.0, | ||
|
|
@@ -109,6 +110,7 @@ def __init__( | |
| self.termination_mode = termination_mode | ||
| self.resample_frequency = resample_frequency | ||
| self.dynamics_model = dynamics_model | ||
| self.eval_mode = eval_starting_map >= 0 | ||
| # reward randomization bounds | ||
| self.reward_bound_goal_radius_min = reward_bound_goal_radius_min | ||
| self.reward_bound_collision_min = reward_bound_collision_min | ||
|
|
@@ -242,6 +244,10 @@ def __init__( | |
| f"Please reduce num_maps, add more maps to {map_dir}, or set allow_fewer_maps=True." | ||
| ) | ||
|
|
||
| # This is to track on which maps the evaluation is running | ||
| self.eval_map_counter = eval_starting_map | ||
| self.eval_num_maps_to_process = eval_num_maps_to_process | ||
| self.eval_last_map = eval_starting_map + eval_num_maps_to_process | ||
| # Iterate through all maps to count total agents that can be initialized for each map | ||
| agent_offsets, map_ids, num_envs = binding.shared( | ||
| map_files=self.map_files, | ||
|
|
@@ -255,6 +261,8 @@ def __init__( | |
| reward_conditioning=self.reward_conditioning, | ||
| min_goal_distance=self.min_goal_distance, | ||
| max_goal_distance=self.max_goal_distance, | ||
| eval_map_counter=self.eval_map_counter, | ||
| eval_last_map=self.eval_last_map, | ||
| reward_bound_goal_radius_min=self.reward_bound_goal_radius_min, | ||
| reward_bound_goal_radius_max=self.reward_bound_goal_radius_max, | ||
| reward_bound_collision_min=self.reward_bound_collision_min, | ||
|
|
@@ -287,14 +295,14 @@ def __init__( | |
| reward_bound_steer_max=self.reward_bound_steer_max, | ||
| reward_bound_acc_min=self.reward_bound_acc_min, | ||
| reward_bound_acc_max=self.reward_bound_acc_max, | ||
| use_all_maps=use_all_maps, | ||
| ) | ||
|
|
||
| # agent_offsets[-1] works in both cases, just making it explicit that num_agents is ignored if use_all_maps | ||
| self.num_agents = num_agents if not use_all_maps else agent_offsets[-1] | ||
| self.num_agents = num_agents | ||
| self.agent_offsets = agent_offsets | ||
| self.map_ids = map_ids | ||
| self.num_envs = num_envs | ||
| # NOTE: What will happen if the map_counter becomes higher than num_maps ? | ||
| self.eval_map_counter += num_envs | ||
| super().__init__(buf=buf) | ||
| env_ids = [] | ||
| for i in range(num_envs): | ||
|
|
@@ -382,14 +390,22 @@ def step(self, actions): | |
| self.terminals[:] = 0 | ||
| self.truncations[:] = 0 | ||
| self.actions[:] = actions | ||
|
|
||
| # If an eval worker already finished its job we want to skip | ||
| if self.eval_mode and self.resample_frequency == 0 and self.tick >= self.episode_length: | ||
| return (self.observations, self.rewards, self.terminals, self.truncations, []) | ||
|
Comment on lines
+395
to
+396
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. skipped further processing when eval worker completes all assigned maps, preventing unnecessary computation |
||
|
|
||
| binding.vec_step(self.c_envs) | ||
| self.tick += 1 | ||
| info = [] | ||
| if self.tick % self.report_interval == 0: | ||
| log = binding.vec_log(self.c_envs, self.num_agents) | ||
| log = binding.vec_log(self.c_envs, self.num_agents, self.eval_mode) | ||
| if log: | ||
| info.append(log) | ||
| # print(log) | ||
| # In eval mode, I don't want to resample after the last batch (womd specific) | ||
| if self.eval_mode and self.eval_map_counter >= self.eval_last_map: | ||
| self.resample_frequency = 0 | ||
| if self.tick > 0 and self.resample_frequency > 0 and self.tick % self.resample_frequency == 0: | ||
| self.tick = 0 | ||
| binding.vec_close(self.c_envs) | ||
|
|
@@ -405,6 +421,8 @@ def step(self, actions): | |
| reward_conditioning=self.reward_conditioning, | ||
| min_goal_distance=self.min_goal_distance, | ||
| max_goal_distance=self.max_goal_distance, | ||
| eval_map_counter=self.eval_map_counter, | ||
| eval_last_map=self.eval_last_map, | ||
| # reward randomization bounds | ||
| reward_bound_collision_min=self.reward_bound_collision_min, | ||
| reward_bound_goal_radius_min=self.reward_bound_goal_radius_min, | ||
|
|
@@ -438,11 +456,11 @@ def step(self, actions): | |
| reward_bound_steer_max=self.reward_bound_steer_max, | ||
| reward_bound_acc_min=self.reward_bound_acc_min, | ||
| reward_bound_acc_max=self.reward_bound_acc_max, | ||
| use_all_maps=False, | ||
| ) | ||
| self.agent_offsets = agent_offsets | ||
| self.map_ids = map_ids | ||
| self.num_envs = num_envs | ||
| self.eval_map_counter += num_envs | ||
| env_ids = [] | ||
| seed = np.random.randint(0, 2**32 - 1) | ||
| for i in range(num_envs): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
modulo operation causes map index wrapping, potentially reprocessing maps
when
map_idxexceedsnum_maps, the modulo wraps it back to 0, which could cause the same maps to be evaluated multiple times if fewer than expected environments are created in earlier batchesif a worker is assigned maps 0-99 but only creates 95 envs in the first batch, it will start at map 95 next time, then wrap to maps 0-89, causing duplication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It prevents the code from crashing if a map is skipped, but it will also be useful in the CARLA setting where we want to sample the same 5 maps multiple times.
Once again no maps should be skipped if we initialize the SDC first