diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index f178fedfc3..361e50b9b8 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -1,7 +1,75 @@ +#include #include "drive.h" #define Env Drive #define MY_SHARED #define MY_PUT + +// Module-level map cache: indexed by map_id, populated by my_shared(), used by my_init(). +static SharedMapData **g_map_cache = NULL; +static int g_map_cache_size = 0; +static pid_t g_map_cache_pid = 0; // PID of the process that created the cache +// Cache key: params that affect SharedMapData contents (roads, grid, polylines) +static float g_cache_observation_window_size = 0; +static float g_cache_polyline_reduction_threshold = 0; +static float g_cache_polyline_max_segment_length = 0; +static char **g_cache_map_paths = NULL; // strdup'd file paths, one per map_id + +static void reset_cache_globals(void) { + g_map_cache = NULL; + g_map_cache_size = 0; + g_map_cache_pid = 0; + g_cache_observation_window_size = 0; + g_cache_polyline_reduction_threshold = 0; + g_cache_polyline_max_segment_length = 0; + g_cache_map_paths = NULL; +} + +static void release_map_cache_internal(void) { + if (g_map_cache == NULL) + return; + // After fork, child inherits g_map_cache pointers via copy-on-write. + // We must NOT free them — they belong to the parent's address space. + // Discarding the pointers "leaks" the CoW pages in the child, but this + // is bounded (one map cache worth of memory per worker) and unavoidable + // with fork-based multiprocessing. The child rebuilds its own cache on + // the next my_shared() call. + if (g_map_cache_pid != 0 && g_map_cache_pid != getpid()) { + reset_cache_globals(); + return; + } + for (int i = 0; i < g_map_cache_size; i++) { + if (g_map_cache[i] != NULL) { + if (g_map_cache[i]->ref_count > 0) { + fprintf(stderr, + "ERROR: cannot release map cache — entry %d still has %d live env(s). " + "Close all Drive instances before changing map config.\n", + i, g_map_cache[i]->ref_count); + return; // Refuse to free — callers must close envs first + } + } + } + for (int i = 0; i < g_map_cache_size; i++) { + if (g_map_cache[i] != NULL) { + free_shared_map_data(g_map_cache[i]); + } + } + free(g_map_cache); + if (g_cache_map_paths != NULL) { + for (int i = 0; i < g_map_cache_size; i++) { + free(g_cache_map_paths[i]); + } + free(g_cache_map_paths); + } + reset_cache_globals(); +} + +static PyObject *release_map_cache_py(PyObject *self, PyObject *args) { + release_map_cache_internal(); + Py_RETURN_NONE; +} + +#define MY_METHODS {"release_map_cache", release_map_cache_py, METH_VARARGS, "Release the shared map data cache"} + #include "../env_binding.h" static int my_put(Env *env, PyObject *args, PyObject *kwargs) { @@ -137,6 +205,50 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { clock_gettime(CLOCK_REALTIME, &ts); srand(ts.tv_nsec); + // Reuse existing cache if it was created by this process with matching config. + // The cache key includes: PID, num_maps, map file paths, and params that + // affect SharedMapData (observation_window_size for grid, polyline params for + // road simplification). Other params (init_mode, reward bounds, etc.) only + // affect per-env state and don't change the shared map data. + int reuse_cache = (g_map_cache != NULL && g_map_cache_pid == getpid() && g_map_cache_size == num_maps && + g_cache_observation_window_size == observation_window_size && + g_cache_polyline_reduction_threshold == polyline_reduction_threshold && + g_cache_polyline_max_segment_length == polyline_max_segment_length); + // Also check that map file paths haven't changed + if (reuse_cache && g_cache_map_paths != NULL) { + for (int i = 0; i < num_maps; i++) { + const char *path = PyUnicode_AsUTF8(PyList_GetItem(map_files_list, i)); + if (g_cache_map_paths[i] == NULL || strcmp(g_cache_map_paths[i], path) != 0) { + reuse_cache = 0; + break; + } + } + } + + if (!reuse_cache) { + release_map_cache_internal(); + if (g_map_cache != NULL) { + // release_map_cache_internal refused to free because envs are still alive. + // Cannot change map config while Drive instances exist. + PyErr_SetString(PyExc_RuntimeError, + "Cannot change map cache config while Drive environments are still open. " + "Call close() on all Drive instances first."); + return NULL; + } + g_map_cache_size = num_maps; + g_map_cache = (SharedMapData **)calloc(num_maps, sizeof(SharedMapData *)); + g_map_cache_pid = getpid(); + g_cache_observation_window_size = observation_window_size; + g_cache_polyline_reduction_threshold = polyline_reduction_threshold; + g_cache_polyline_max_segment_length = polyline_max_segment_length; + // Store map file paths for future reuse checks + g_cache_map_paths = (char **)calloc(num_maps, sizeof(char *)); + for (int i = 0; i < num_maps; i++) { + const char *path = PyUnicode_AsUTF8(PyList_GetItem(map_files_list, i)); + g_cache_map_paths[i] = strdup(path); + } + } + int max_envs = use_all_maps ? num_maps : num_agents; if (init_mode == INIT_VARIABLE_AGENT_NUMBER) { @@ -168,9 +280,19 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { int offset = 0; for (int i = 0; i < env_count; i++) { + int map_id = rand() % num_maps; PyList_SetItem(agent_offsets, i, PyLong_FromLong(offset)); - PyList_SetItem(map_ids_list, i, PyLong_FromLong(rand() % num_maps)); + PyList_SetItem(map_ids_list, i, PyLong_FromLong(map_id)); offset += agent_counts[i]; + + // Lazily populate map cache for assigned maps + if (g_map_cache[map_id] == NULL) { + PyObject *map_file_obj = PyList_GetItem(map_files_list, map_id); + const char *map_file_path = PyUnicode_AsUTF8(map_file_obj); + g_map_cache[map_id] = + create_shared_map_data(map_file_path, init_mode, control_mode, init_steps, observation_window_size, + polyline_reduction_threshold, polyline_max_segment_length); + } } PyList_SetItem(agent_offsets, env_count, PyLong_FromLong(num_agents)); // In random mode, we guarantee num_agents accross all envs @@ -189,74 +311,46 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { PyObject *agent_offsets = PyList_New(max_envs + 1); PyObject *map_ids = PyList_New(max_envs); - // getting env count + // getting env count — use cached SharedMapData to count agents instead of loading per-env while (use_all_maps ? map_idx < max_envs : total_agent_count < num_agents && env_count < max_envs) { int map_id = use_all_maps ? map_idx++ : rand() % num_maps; - Drive *env = calloc(1, sizeof(Drive)); - env->init_mode = init_mode; - env->control_mode = control_mode; - env->init_steps = init_steps; - env->goal_behavior = goal_behavior; - env->reward_randomization = reward_randomization; - env->turn_off_normalization = turn_off_normalization; - env->reward_conditioning = reward_conditioning; - env->min_goal_distance = min_goal_distance; - env->max_goal_distance = max_goal_distance; - env->observation_window_size = observation_window_size; - env->polyline_reduction_threshold = polyline_reduction_threshold; - env->polyline_max_segment_length = polyline_max_segment_length; - env->min_avg_speed_to_consider_goal_attempt = min_avg_speed_to_consider_goal_attempt; - // reward randomization bounds - env->reward_bounds[REWARD_COEF_GOAL_RADIUS] = - (RewardBound){reward_bound_goal_radius_min, reward_bound_goal_radius_max}; - env->reward_bounds[REWARD_COEF_COLLISION] = - (RewardBound){reward_bound_collision_min, reward_bound_collision_max}; - env->reward_bounds[REWARD_COEF_OFFROAD] = (RewardBound){reward_bound_offroad_min, reward_bound_offroad_max}; - env->reward_bounds[REWARD_COEF_COMFORT] = (RewardBound){reward_bound_comfort_min, reward_bound_comfort_max}; - env->reward_bounds[REWARD_COEF_LANE_ALIGN] = - (RewardBound){reward_bound_lane_align_min, reward_bound_lane_align_max}; - env->reward_bounds[REWARD_COEF_LANE_CENTER] = - (RewardBound){reward_bound_lane_center_min, reward_bound_lane_center_max}; - env->reward_bounds[REWARD_COEF_VELOCITY] = (RewardBound){reward_bound_velocity_min, reward_bound_velocity_max}; - env->reward_bounds[REWARD_COEF_TRAFFIC_LIGHT] = - (RewardBound){reward_bound_traffic_light_min, reward_bound_traffic_light_max}; - env->reward_bounds[REWARD_COEF_CENTER_BIAS] = - (RewardBound){reward_bound_center_bias_min, reward_bound_center_bias_max}; - env->reward_bounds[REWARD_COEF_VEL_ALIGN] = - (RewardBound){reward_bound_vel_align_min, reward_bound_vel_align_max}; - env->reward_bounds[REWARD_COEF_OVERSPEED] = - (RewardBound){reward_bound_overspeed_min, reward_bound_overspeed_max}; - env->reward_bounds[REWARD_COEF_TIMESTEP] = (RewardBound){reward_bound_timestep_min, reward_bound_timestep_max}; - env->reward_bounds[REWARD_COEF_REVERSE] = (RewardBound){reward_bound_reverse_min, reward_bound_reverse_max}; - env->reward_bounds[REWARD_COEF_THROTTLE] = (RewardBound){reward_bound_throttle_min, reward_bound_throttle_max}; - env->reward_bounds[REWARD_COEF_STEER] = (RewardBound){reward_bound_steer_min, reward_bound_steer_max}; - env->reward_bounds[REWARD_COEF_ACC] = (RewardBound){reward_bound_acc_min, reward_bound_acc_max}; - - // Get map file path from Python list - PyObject *map_file_obj = PyList_GetItem(map_files_list, map_id); - const char *map_file_path = PyUnicode_AsUTF8(map_file_obj); - load_map_binary(map_file_path, env); - set_active_agents(env); + + // Lazily populate map cache + if (g_map_cache[map_id] == NULL) { + PyObject *map_file_obj = PyList_GetItem(map_files_list, map_id); + const char *map_file_path = PyUnicode_AsUTF8(map_file_obj); + g_map_cache[map_id] = + create_shared_map_data(map_file_path, init_mode, control_mode, init_steps, observation_window_size, + polyline_reduction_threshold, polyline_max_segment_length); + } + SharedMapData *shared = g_map_cache[map_id]; + + // Count active agents using a temporary Drive (doesn't allocate map data) + Drive temp_env = {0}; + temp_env.init_mode = init_mode; + temp_env.control_mode = control_mode; + temp_env.init_steps = init_steps; + temp_env.goal_behavior = goal_behavior; + temp_env.reward_randomization = reward_randomization; + temp_env.turn_off_normalization = turn_off_normalization; + temp_env.reward_conditioning = reward_conditioning; + temp_env.agents = shared->template_agents; + temp_env.num_objects = shared->num_objects; + temp_env.sdc_track_index = shared->sdc_track_index; + temp_env.num_tracks_to_predict = shared->num_tracks_to_predict; + temp_env.tracks_to_predict_indices = shared->tracks_to_predict_indices; + set_active_agents(&temp_env); + int active_count = temp_env.active_agent_count; + + // Free the index arrays that set_active_agents allocated + free(temp_env.active_agent_indices); + free(temp_env.static_agent_indices); + free(temp_env.expert_static_agent_indices); // Skip map if it doesn't contain any controllable agents - if (env->active_agent_count == 0) { + if (active_count == 0) { maps_checked++; - - // Safeguard: if we've checked all available maps and found no active agents, raise an error if (maps_checked >= num_maps) { - for (int j = 0; j < env->num_objects; j++) { - free_agent(&env->agents[j]); - } - for (int j = 0; j < env->num_roads; j++) { - free_road_element(&env->road_elements[j]); - } - free(env->agents); - free(env->road_elements); - free(env->road_scenario_ids); - free(env->active_agent_indices); - free(env->static_agent_indices); - free(env->expert_static_agent_indices); - free(env); Py_DECREF(agent_offsets); Py_DECREF(map_ids); char error_msg[256]; @@ -264,30 +358,14 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { PyErr_SetString(PyExc_ValueError, error_msg); return NULL; } - - // Store map_id - PyObject *map_id_obj = PyLong_FromLong(map_id); - PyList_SetItem(map_ids, env_count, map_id_obj); - // Store agent offset - PyObject *offset = PyLong_FromLong(total_agent_count); - PyList_SetItem(agent_offsets, env_count, offset); - total_agent_count += env->active_agent_count; - env_count++; - for (int j = 0; j < env->num_objects; j++) { - free_agent(&env->agents[j]); - } - for (int j = 0; j < env->num_roads; j++) { - free_road_element(&env->road_elements[j]); - } - free(env->agents); - free(env->road_elements); - free(env->road_scenario_ids); - free(env->active_agent_indices); - free(env->static_agent_indices); - free(env->expert_static_agent_indices); - free(env); continue; } + + // Store map_id and agent offset + PyList_SetItem(map_ids, env_count, PyLong_FromLong(map_id)); + PyList_SetItem(agent_offsets, env_count, PyLong_FromLong(total_agent_count)); + total_agent_count += active_count; + env_count++; } if (total_agent_count >= num_agents) { @@ -408,6 +486,23 @@ static int my_init(Env *env, PyObject *args, PyObject *kwargs) { env->map_name = map_path; env->init_steps = init_steps; env->timestep = init_steps; + + // Check if map_id was provided and the map cache is populated + PyObject *map_id_obj = kwargs ? PyDict_GetItemString(kwargs, "map_id") : NULL; + if (map_id_obj != NULL && g_map_cache != NULL) { + int map_id = (int)PyLong_AsLong(map_id_obj); + if (map_id >= 0 && map_id < g_map_cache_size && g_map_cache[map_id] != NULL) { + init_from_shared(env, g_map_cache[map_id]); + return 0; + } + } + + // Fallback: load from disk (standalone use, tests, rendering, etc.) + // If map_id was provided but cache miss occurred, warn — this likely indicates a bug. + if (map_id_obj != NULL) { + fprintf(stderr, "WARNING: map_id=%d provided but cache miss — falling back to disk loading\n", + (int)PyLong_AsLong(map_id_obj)); + } init(env); return 0; } diff --git a/pufferlib/ocean/drive/drive.h b/pufferlib/ocean/drive/drive.h index fc4aa2bfc1..fa192b8ecd 100644 --- a/pufferlib/ocean/drive/drive.h +++ b/pufferlib/ocean/drive/drive.h @@ -281,6 +281,42 @@ struct AgentSpawnSettings { float h; }; +// Shared map data that can be reused across multiple environments using the same map. +// Holds road infrastructure (road_elements, grid_map, neighbor_cache) and agent templates. +// Reference-counted: freed when no env references it and the cache is released. +typedef struct SharedMapData SharedMapData; +struct SharedMapData { + // Road infrastructure (shared read-only across envs) + RoadMapElement *road_elements; + int *road_scenario_ids; + int num_roads; + + // Spatial index + GridMap *grid_map; + int *neighbor_offsets; + + // World coordinate means + float world_mean_x; + float world_mean_y; + float world_mean_z; + + // Drivable lane index (computed from road_elements, used for agent spawning) + int *drivable_lane_indices; + float *drivable_lane_lengths; + int num_drivable; + float total_drivable_lane_length; + + // Agent templates from binary (for cloning into per-env agents) + Agent *template_agents; + int num_objects; + int sdc_track_index; + int num_tracks_to_predict; + int *tracks_to_predict_indices; + + // Reference counting + int ref_count; +}; + struct Drive { Client *client; float *observations; @@ -360,6 +396,9 @@ struct Drive { int turn_off_normalization; RewardBound reward_bounds[NUM_REWARD_COEFS]; float min_avg_speed_to_consider_goal_attempt; + + // Shared map data (NULL if map data is owned by this env) + SharedMapData *shared_map; }; // ======================================== @@ -2222,9 +2261,23 @@ void remove_bad_trajectories(Drive *env) { env->timestep = 0; } +// Common post-load initialization shared by both init() and init_from_shared(). +// Called after map data and drivable lanes are ready. Spawns/configures agents, +// sets start positions, and allocates per-env logs. +void finalize_env(Drive *env) { + env->logs_capacity = 0; + set_active_agents(env); + env->logs_capacity = env->active_agent_count; + remove_bad_trajectories(env); + set_start_position(env); + init_goal_positions(env); + env->logs = (Log *)calloc(env->active_agent_count, sizeof(Log)); +} + void init(Drive *env) { env->human_agent_idx = 0; env->timestep = 0; + env->shared_map = NULL; load_map_binary(env->map_name, env); filter_road_elements(env); compute_drivable_lane_points(env); @@ -2235,45 +2288,218 @@ void init(Drive *env) { generate_offsets(z_offsets, Z_RANGE); init_neighbor_offsets(env); cache_neighbor_offsets(env); - env->logs_capacity = 0; - set_active_agents(env); - env->logs_capacity = env->active_agent_count; - remove_bad_trajectories(env); - set_start_position(env); - init_goal_positions(env); - env->logs = (Log *)calloc(env->active_agent_count, sizeof(Log)); + finalize_env(env); } -void c_close(Drive *env) { - free_agents(env->agents, env->num_objects); - for (int i = 0; i < env->num_roads; i++) { - free_road_element(&env->road_elements[i]); +// Initialize an environment using shared map data instead of loading from disk. +// Road elements, grid_map, and neighbor cache are shared (pointer copy, not duplicated). +// Agents are cloned per-env since they have mutable sim state. +void init_from_shared(Drive *env, SharedMapData *shared) { + env->human_agent_idx = 0; + env->timestep = 0; + env->shared_map = shared; + shared->ref_count++; + + // Point to shared road/grid data (read-only, not duplicated) + env->road_elements = shared->road_elements; + env->road_scenario_ids = shared->road_scenario_ids; + env->num_roads = shared->num_roads; + env->grid_map = shared->grid_map; + env->neighbor_offsets = shared->neighbor_offsets; + env->world_mean_x = shared->world_mean_x; + env->world_mean_y = shared->world_mean_y; + env->world_mean_z = shared->world_mean_z; + env->sdc_track_index = shared->sdc_track_index; + env->num_tracks_to_predict = shared->num_tracks_to_predict; + + // Copy tracks_to_predict_indices (small, per-env) + if (shared->num_tracks_to_predict > 0) { + env->tracks_to_predict_indices = (int *)malloc(shared->num_tracks_to_predict * sizeof(int)); + memcpy(env->tracks_to_predict_indices, shared->tracks_to_predict_indices, + shared->num_tracks_to_predict * sizeof(int)); + } else { + env->tracks_to_predict_indices = NULL; } - free(env->road_elements); - free(env->road_scenario_ids); - free(env->active_agent_indices); - free(env->drivable_lane_indices); - free(env->drivable_lane_lengths); - free(env->logs); - // GridMap cleanup - int grid_cell_count = env->grid_map->grid_cols * env->grid_map->grid_rows; - for (int grid_index = 0; grid_index < grid_cell_count; grid_index++) { - free(env->grid_map->cells[grid_index]); + + // Generate collision/z offsets (static globals, idempotent) + generate_offsets(collision_offsets, COLLISION_RANGE); + generate_offsets(z_offsets, Z_RANGE); + + // Point to shared drivable lane data (read-only, used by spawn_agent) + env->drivable_lane_indices = shared->drivable_lane_indices; + env->drivable_lane_lengths = shared->drivable_lane_lengths; + env->num_drivable = shared->num_drivable; + env->total_drivable_lane_length = shared->total_drivable_lane_length; + + if (env->init_mode == INIT_VARIABLE_AGENT_NUMBER) { + // Variable agent mode: agents are spawned fresh, not cloned from template. + // Set num_objects=0 so spawn_active_agents' free_agents() call is a no-op. + env->agents = NULL; + env->num_objects = 0; + } else { + // Clone agents from template (each env needs its own mutable copy) + env->num_objects = shared->num_objects; + env->agents = (Agent *)calloc(shared->num_objects, sizeof(Agent)); + for (int i = 0; i < shared->num_objects; i++) { + Agent *src = &shared->template_agents[i]; + Agent *dst = &env->agents[i]; + // Copy all scalar fields + *dst = *src; + // Deep copy trajectory arrays + allocate_agent_trajectories(dst); + memcpy(dst->log_trajectory_x, src->log_trajectory_x, src->trajectory_length * sizeof(float)); + memcpy(dst->log_trajectory_y, src->log_trajectory_y, src->trajectory_length * sizeof(float)); + memcpy(dst->log_trajectory_z, src->log_trajectory_z, src->trajectory_length * sizeof(float)); + memcpy(dst->log_velocity_x, src->log_velocity_x, src->trajectory_length * sizeof(float)); + memcpy(dst->log_velocity_y, src->log_velocity_y, src->trajectory_length * sizeof(float)); + memcpy(dst->log_heading, src->log_heading, src->trajectory_length * sizeof(float)); + memcpy(dst->log_valid, src->log_valid, src->trajectory_length * sizeof(int)); + // Route and path are per-env, start NULL + dst->route = NULL; + dst->path = NULL; + } } - free(env->grid_map->cells); - free(env->grid_map->cell_entities_count); - free(env->neighbor_offsets); + finalize_env(env); +} + +// Free a SharedMapData and all data it owns. +void free_shared_map_data(SharedMapData *shared) { + if (shared == NULL) + return; + + // Free road elements + for (int i = 0; i < shared->num_roads; i++) { + free_road_element(&shared->road_elements[i]); + } + free(shared->road_elements); + free(shared->road_scenario_ids); + + // Free grid map + int grid_cell_count = shared->grid_map->grid_cols * shared->grid_map->grid_rows; + for (int grid_index = 0; grid_index < grid_cell_count; grid_index++) { + free(shared->grid_map->cells[grid_index]); + } + free(shared->grid_map->cells); + free(shared->grid_map->cell_entities_count); + free(shared->neighbor_offsets); for (int i = 0; i < grid_cell_count; i++) { - free(env->grid_map->neighbor_cache_entities[i]); + free(shared->grid_map->neighbor_cache_entities[i]); } - free(env->grid_map->neighbor_cache_entities); - free(env->grid_map->neighbor_cache_count); - free(env->grid_map); + free(shared->grid_map->neighbor_cache_entities); + free(shared->grid_map->neighbor_cache_count); + free(shared->grid_map); + + // Free drivable lane data + free(shared->drivable_lane_indices); + free(shared->drivable_lane_lengths); + + // Free template agents + free_agents(shared->template_agents, shared->num_objects); + + // Free tracks_to_predict + free(shared->tracks_to_predict_indices); + + free(shared); +} + +// Create a SharedMapData by loading a map binary and building all spatial structures. +SharedMapData *create_shared_map_data(const char *map_file_path, int init_mode, int control_mode, int init_steps, + float observation_window_size, float polyline_reduction_threshold, + float polyline_max_segment_length) { + SharedMapData *shared = (SharedMapData *)calloc(1, sizeof(SharedMapData)); + + // Create a temporary Drive to use existing loading/processing functions. + Drive temp = {0}; + temp.init_mode = init_mode; + temp.control_mode = control_mode; + temp.init_steps = init_steps; + temp.observation_window_size = observation_window_size; + temp.polyline_reduction_threshold = polyline_reduction_threshold; + temp.polyline_max_segment_length = polyline_max_segment_length; + + // Load and process map data (same pipeline as init()) + load_map_binary(map_file_path, &temp); + filter_road_elements(&temp); + create_sparse_lane_points(&temp, polyline_reduction_threshold, polyline_max_segment_length); + compute_drivable_lane_points(&temp); + + // Transfer ownership to shared struct + shared->template_agents = temp.agents; + shared->num_objects = temp.num_objects; + shared->num_roads = temp.num_roads; + shared->road_elements = temp.road_elements; + shared->road_scenario_ids = temp.road_scenario_ids; + shared->sdc_track_index = temp.sdc_track_index; + shared->num_tracks_to_predict = temp.num_tracks_to_predict; + shared->tracks_to_predict_indices = temp.tracks_to_predict_indices; + shared->drivable_lane_indices = temp.drivable_lane_indices; + shared->drivable_lane_lengths = temp.drivable_lane_lengths; + shared->num_drivable = temp.num_drivable; + shared->total_drivable_lane_length = temp.total_drivable_lane_length; + + // Compute world means + set_means(&temp); + shared->world_mean_x = temp.world_mean_x; + shared->world_mean_y = temp.world_mean_y; + shared->world_mean_z = temp.world_mean_z; + + // Build grid map and neighbor cache + init_grid_map(&temp); + generate_offsets(collision_offsets, COLLISION_RANGE); + generate_offsets(z_offsets, Z_RANGE); + init_neighbor_offsets(&temp); + cache_neighbor_offsets(&temp); + + // Transfer grid data ownership to shared struct + shared->grid_map = temp.grid_map; + shared->neighbor_offsets = temp.neighbor_offsets; + + shared->ref_count = 0; + return shared; +} + +void c_close(Drive *env) { + // Always free per-env agent data + free_agents(env->agents, env->num_objects); + free(env->active_agent_indices); + // Only free drivable lanes if not shared (shared envs point to SharedMapData's copy) + if (env->shared_map == NULL) { + free(env->drivable_lane_indices); + free(env->drivable_lane_lengths); + } + free(env->logs); free(env->static_agent_indices); free(env->expert_static_agent_indices); free(env->tracks_to_predict_indices); free(env->ini_file); + + if (env->shared_map != NULL) { + // Map data is shared — just decrement ref count, don't free + env->shared_map->ref_count--; + env->shared_map = NULL; + } else { + // Map data is owned by this env — free it + for (int i = 0; i < env->num_roads; i++) { + free_road_element(&env->road_elements[i]); + } + free(env->road_elements); + free(env->road_scenario_ids); + + int grid_cell_count = env->grid_map->grid_cols * env->grid_map->grid_rows; + for (int grid_index = 0; grid_index < grid_cell_count; grid_index++) { + free(env->grid_map->cells[grid_index]); + } + free(env->grid_map->cells); + free(env->grid_map->cell_entities_count); + free(env->neighbor_offsets); + for (int i = 0; i < grid_cell_count; i++) { + free(env->grid_map->neighbor_cache_entities[i]); + } + free(env->grid_map->neighbor_cache_entities); + free(env->grid_map->neighbor_cache_count); + free(env->grid_map); + } } void allocate(Drive *env) { diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index 3e705f3a51..ecd0b00c47 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -344,7 +344,7 @@ def __init__( self.rewards[cur:nxt], self.terminals[cur:nxt], self.truncations[cur:nxt], - seed, + seed + i, # unique seed per sub-env so spawn positions differ action_type=self._action_type_flag, human_agent_idx=human_agent_idx, reward_vehicle_collision=reward_vehicle_collision, @@ -405,6 +405,7 @@ def __init__( episode_length=(int(episode_length) if episode_length is not None else None), termination_mode=(int(self.termination_mode) if self.termination_mode is not None else 0), map_path=self.map_files[map_ids[i]], + map_id=map_ids[i], max_agents=nxt - cur, ini_file="pufferlib/config/ocean/drive.ini", init_steps=init_steps, @@ -502,7 +503,7 @@ def resample_maps(self): self.rewards[cur:nxt], self.terminals[cur:nxt], self.truncations[cur:nxt], - seed, + seed + i, # unique seed per sub-env so spawn positions differ action_type=self._action_type_flag, human_agent_idx=self.human_agent_idx, reward_vehicle_collision=self.reward_vehicle_collision, @@ -562,6 +563,7 @@ def resample_maps(self): dt=self.dt, episode_length=(int(self.episode_length) if self.episode_length is not None else None), map_path=self.map_files[map_ids[i]], + map_id=map_ids[i], max_agents=nxt - cur, ini_file="pufferlib/config/ocean/drive.ini", init_steps=self.init_steps, diff --git a/scripts/submit_cluster.py b/scripts/submit_cluster.py index 395dc59c74..c2a41c57fe 100644 --- a/scripts/submit_cluster.py +++ b/scripts/submit_cluster.py @@ -286,13 +286,27 @@ def launch_training(args, from_config, cmd, save_dir, project_root, container_co else: os.remove(dst) os.symlink(src, dst) - # Copy pufferlib/ as a real dir tree so we can replace .so files + # Create symlink tree of pufferlib/, excluding resources/drive/binaries + # (read-only data, 60K+ map files) which is symlinked as a single dir. pufferlib_dst = os.path.join(isolated_root, "pufferlib") if os.path.islink(pufferlib_dst): os.remove(pufferlib_dst) pufferlib_src = os.path.join(project_root, "pufferlib") - subprocess.run(["cp", "-rs", pufferlib_src, pufferlib_dst], check=True) - # Copy .so files over their symlinks so they survive rebuilds + skip_dir = os.path.join(pufferlib_src, "resources", "drive", "binaries") + for dirpath, dirnames, filenames in os.walk(pufferlib_src): + if os.path.abspath(dirpath) == os.path.abspath(skip_dir): + dirnames.clear() + continue + rel = os.path.relpath(dirpath, pufferlib_src) + dst_dir = os.path.join(pufferlib_dst, rel) + os.makedirs(dst_dir, exist_ok=True) + for fname in filenames: + src_file = os.path.join(dirpath, fname) + dst_file = os.path.join(dst_dir, fname) + os.symlink(src_file, dst_file) + # Symlink binaries dir back to original (read-only, shared across runs) + os.symlink(skip_dir, os.path.join(pufferlib_dst, "resources", "drive", "binaries")) + # Copy .so files so they survive rebuilds on other branches for so_link in glob.glob(os.path.join(pufferlib_dst, "**", "*.so"), recursive=True): if os.path.islink(so_link): real_path = os.path.realpath(so_link)