Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ced67e2
Add a Flag to build_ocean so Raylib can work on Debian 11
WaelDLZ Jan 16, 2026
77898d1
Merge pull request #263 from Emerge-Lab/wbd/debian_issue
WaelDLZ Jan 16, 2026
2dff5d2
Add shared map infrastructure to reduce memory usage
eugenevinitsky Mar 17, 2026
b9a76d4
Merge remote-tracking branch 'origin/main' into ev/reuse_maps
eugenevinitsky Mar 17, 2026
275e45c
Merge remote-tracking branch 'origin/3.0' into ev/reuse_maps
eugenevinitsky Mar 17, 2026
5962ac5
Skip binaries dir in code isolation to avoid slow NFS walks
eugenevinitsky Mar 17, 2026
c1340c1
Fix segfault: don't free inherited map cache after fork
eugenevinitsky Mar 18, 2026
4d78b82
Add debug prints to trace segfault in move_dynamics/c_step
eugenevinitsky Mar 18, 2026
e7f83ec
Fix segfault: reuse map cache across PufferDrive instances in same pr…
eugenevinitsky Mar 18, 2026
c2fa8ab
Apply clang-format to C files
eugenevinitsky Mar 18, 2026
a6299bb
Remove unused reward_bounds from create_shared_map_data
eugenevinitsky Mar 22, 2026
89b1dd3
Merge 3.0 and fix shared map cache bugs
eugenevinitsky Mar 22, 2026
8209718
Fix: call compute_drivable_lane_points in init_from_shared
eugenevinitsky Mar 22, 2026
7e19d54
Improve shared map cache robustness
eugenevinitsky Mar 22, 2026
2ba9f2b
Fix clang-format: wrap long create_shared_map_data calls
eugenevinitsky Mar 22, 2026
6442176
Fix clang-format: match exact indentation for wrapped function calls
eugenevinitsky Mar 22, 2026
64c6dea
Add map file paths to cache key, fix remaining clang-format
eugenevinitsky Mar 22, 2026
0e40487
Extract reset_cache_globals to eliminate duplicated global reset
eugenevinitsky Mar 22, 2026
376dcd2
Move drivable lanes to SharedMapData (computed once, shared across envs)
eugenevinitsky Mar 22, 2026
204ea56
Fix lifetime bug: refuse to free cache with live refs
eugenevinitsky Mar 22, 2026
49e3c9e
Fix: diversify seed per sub-env in env_init
eugenevinitsky Mar 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 180 additions & 85 deletions pufferlib/ocean/drive/binding.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,75 @@
#include <Python.h>
#include "drive.h"
#define Env Drive
#define MY_SHARED
#define MY_PUT

// Module-level map cache: indexed by map_id, populated by my_shared(), used by my_init().
static SharedMapData **g_map_cache = NULL;
static int g_map_cache_size = 0;
static pid_t g_map_cache_pid = 0; // PID of the process that created the cache
// Cache key: params that affect SharedMapData contents (roads, grid, polylines)
static float g_cache_observation_window_size = 0;
static float g_cache_polyline_reduction_threshold = 0;
static float g_cache_polyline_max_segment_length = 0;
static char **g_cache_map_paths = NULL; // strdup'd file paths, one per map_id

static void reset_cache_globals(void) {
g_map_cache = NULL;
g_map_cache_size = 0;
g_map_cache_pid = 0;
g_cache_observation_window_size = 0;
g_cache_polyline_reduction_threshold = 0;
g_cache_polyline_max_segment_length = 0;
g_cache_map_paths = NULL;
}

static void release_map_cache_internal(void) {
if (g_map_cache == NULL)
return;
// After fork, child inherits g_map_cache pointers via copy-on-write.
// We must NOT free them — they belong to the parent's address space.
// Discarding the pointers "leaks" the CoW pages in the child, but this
// is bounded (one map cache worth of memory per worker) and unavoidable
// with fork-based multiprocessing. The child rebuilds its own cache on
// the next my_shared() call.
if (g_map_cache_pid != 0 && g_map_cache_pid != getpid()) {
reset_cache_globals();
return;
}
for (int i = 0; i < g_map_cache_size; i++) {
if (g_map_cache[i] != NULL) {
if (g_map_cache[i]->ref_count > 0) {
fprintf(stderr,
"ERROR: cannot release map cache — entry %d still has %d live env(s). "
"Close all Drive instances before changing map config.\n",
i, g_map_cache[i]->ref_count);
return; // Refuse to free — callers must close envs first
}
}
}
for (int i = 0; i < g_map_cache_size; i++) {
if (g_map_cache[i] != NULL) {
free_shared_map_data(g_map_cache[i]);
}
}
free(g_map_cache);
if (g_cache_map_paths != NULL) {
for (int i = 0; i < g_map_cache_size; i++) {
free(g_cache_map_paths[i]);
}
free(g_cache_map_paths);
}
reset_cache_globals();
}

static PyObject *release_map_cache_py(PyObject *self, PyObject *args) {
release_map_cache_internal();
Py_RETURN_NONE;
}

#define MY_METHODS {"release_map_cache", release_map_cache_py, METH_VARARGS, "Release the shared map data cache"}

#include "../env_binding.h"

static int my_put(Env *env, PyObject *args, PyObject *kwargs) {
Expand Down Expand Up @@ -137,6 +205,50 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) {
clock_gettime(CLOCK_REALTIME, &ts);
srand(ts.tv_nsec);

// Reuse existing cache if it was created by this process with matching config.
// The cache key includes: PID, num_maps, map file paths, and params that
// affect SharedMapData (observation_window_size for grid, polyline params for
// road simplification). Other params (init_mode, reward bounds, etc.) only
// affect per-env state and don't change the shared map data.
int reuse_cache = (g_map_cache != NULL && g_map_cache_pid == getpid() && g_map_cache_size == num_maps &&
g_cache_observation_window_size == observation_window_size &&
g_cache_polyline_reduction_threshold == polyline_reduction_threshold &&
g_cache_polyline_max_segment_length == polyline_max_segment_length);
// Also check that map file paths haven't changed
if (reuse_cache && g_cache_map_paths != NULL) {
for (int i = 0; i < num_maps; i++) {
const char *path = PyUnicode_AsUTF8(PyList_GetItem(map_files_list, i));
if (g_cache_map_paths[i] == NULL || strcmp(g_cache_map_paths[i], path) != 0) {
reuse_cache = 0;
break;
}
}
}

if (!reuse_cache) {
release_map_cache_internal();
if (g_map_cache != NULL) {
// release_map_cache_internal refused to free because envs are still alive.
// Cannot change map config while Drive instances exist.
PyErr_SetString(PyExc_RuntimeError,
"Cannot change map cache config while Drive environments are still open. "
"Call close() on all Drive instances first.");
return NULL;
}
g_map_cache_size = num_maps;
g_map_cache = (SharedMapData **)calloc(num_maps, sizeof(SharedMapData *));
g_map_cache_pid = getpid();
g_cache_observation_window_size = observation_window_size;
g_cache_polyline_reduction_threshold = polyline_reduction_threshold;
g_cache_polyline_max_segment_length = polyline_max_segment_length;
// Store map file paths for future reuse checks
g_cache_map_paths = (char **)calloc(num_maps, sizeof(char *));
for (int i = 0; i < num_maps; i++) {
const char *path = PyUnicode_AsUTF8(PyList_GetItem(map_files_list, i));
g_cache_map_paths[i] = strdup(path);
}
}

int max_envs = use_all_maps ? num_maps : num_agents;

if (init_mode == INIT_VARIABLE_AGENT_NUMBER) {
Expand Down Expand Up @@ -168,9 +280,19 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) {

int offset = 0;
for (int i = 0; i < env_count; i++) {
int map_id = rand() % num_maps;
PyList_SetItem(agent_offsets, i, PyLong_FromLong(offset));
PyList_SetItem(map_ids_list, i, PyLong_FromLong(rand() % num_maps));
PyList_SetItem(map_ids_list, i, PyLong_FromLong(map_id));
offset += agent_counts[i];

// Lazily populate map cache for assigned maps
if (g_map_cache[map_id] == NULL) {
PyObject *map_file_obj = PyList_GetItem(map_files_list, map_id);
const char *map_file_path = PyUnicode_AsUTF8(map_file_obj);
g_map_cache[map_id] =
create_shared_map_data(map_file_path, init_mode, control_mode, init_steps, observation_window_size,
polyline_reduction_threshold, polyline_max_segment_length);
}
}
PyList_SetItem(agent_offsets, env_count,
PyLong_FromLong(num_agents)); // In random mode, we guarantee num_agents accross all envs
Expand All @@ -189,105 +311,61 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) {
PyObject *agent_offsets = PyList_New(max_envs + 1);
PyObject *map_ids = PyList_New(max_envs);

// getting env count
// getting env count — use cached SharedMapData to count agents instead of loading per-env
while (use_all_maps ? map_idx < max_envs : total_agent_count < num_agents && env_count < max_envs) {
int map_id = use_all_maps ? map_idx++ : rand() % num_maps;
Drive *env = calloc(1, sizeof(Drive));
env->init_mode = init_mode;
env->control_mode = control_mode;
env->init_steps = init_steps;
env->goal_behavior = goal_behavior;
env->reward_randomization = reward_randomization;
env->turn_off_normalization = turn_off_normalization;
env->reward_conditioning = reward_conditioning;
env->min_goal_distance = min_goal_distance;
env->max_goal_distance = max_goal_distance;
env->observation_window_size = observation_window_size;
env->polyline_reduction_threshold = polyline_reduction_threshold;
env->polyline_max_segment_length = polyline_max_segment_length;
env->min_avg_speed_to_consider_goal_attempt = min_avg_speed_to_consider_goal_attempt;
// reward randomization bounds
env->reward_bounds[REWARD_COEF_GOAL_RADIUS] =
(RewardBound){reward_bound_goal_radius_min, reward_bound_goal_radius_max};
env->reward_bounds[REWARD_COEF_COLLISION] =
(RewardBound){reward_bound_collision_min, reward_bound_collision_max};
env->reward_bounds[REWARD_COEF_OFFROAD] = (RewardBound){reward_bound_offroad_min, reward_bound_offroad_max};
env->reward_bounds[REWARD_COEF_COMFORT] = (RewardBound){reward_bound_comfort_min, reward_bound_comfort_max};
env->reward_bounds[REWARD_COEF_LANE_ALIGN] =
(RewardBound){reward_bound_lane_align_min, reward_bound_lane_align_max};
env->reward_bounds[REWARD_COEF_LANE_CENTER] =
(RewardBound){reward_bound_lane_center_min, reward_bound_lane_center_max};
env->reward_bounds[REWARD_COEF_VELOCITY] = (RewardBound){reward_bound_velocity_min, reward_bound_velocity_max};
env->reward_bounds[REWARD_COEF_TRAFFIC_LIGHT] =
(RewardBound){reward_bound_traffic_light_min, reward_bound_traffic_light_max};
env->reward_bounds[REWARD_COEF_CENTER_BIAS] =
(RewardBound){reward_bound_center_bias_min, reward_bound_center_bias_max};
env->reward_bounds[REWARD_COEF_VEL_ALIGN] =
(RewardBound){reward_bound_vel_align_min, reward_bound_vel_align_max};
env->reward_bounds[REWARD_COEF_OVERSPEED] =
(RewardBound){reward_bound_overspeed_min, reward_bound_overspeed_max};
env->reward_bounds[REWARD_COEF_TIMESTEP] = (RewardBound){reward_bound_timestep_min, reward_bound_timestep_max};
env->reward_bounds[REWARD_COEF_REVERSE] = (RewardBound){reward_bound_reverse_min, reward_bound_reverse_max};
env->reward_bounds[REWARD_COEF_THROTTLE] = (RewardBound){reward_bound_throttle_min, reward_bound_throttle_max};
env->reward_bounds[REWARD_COEF_STEER] = (RewardBound){reward_bound_steer_min, reward_bound_steer_max};
env->reward_bounds[REWARD_COEF_ACC] = (RewardBound){reward_bound_acc_min, reward_bound_acc_max};

// Get map file path from Python list
PyObject *map_file_obj = PyList_GetItem(map_files_list, map_id);
const char *map_file_path = PyUnicode_AsUTF8(map_file_obj);
load_map_binary(map_file_path, env);
set_active_agents(env);

// Lazily populate map cache
if (g_map_cache[map_id] == NULL) {
PyObject *map_file_obj = PyList_GetItem(map_files_list, map_id);
const char *map_file_path = PyUnicode_AsUTF8(map_file_obj);
g_map_cache[map_id] =
create_shared_map_data(map_file_path, init_mode, control_mode, init_steps, observation_window_size,
polyline_reduction_threshold, polyline_max_segment_length);
}
SharedMapData *shared = g_map_cache[map_id];

// Count active agents using a temporary Drive (doesn't allocate map data)
Drive temp_env = {0};
temp_env.init_mode = init_mode;
temp_env.control_mode = control_mode;
temp_env.init_steps = init_steps;
temp_env.goal_behavior = goal_behavior;
temp_env.reward_randomization = reward_randomization;
temp_env.turn_off_normalization = turn_off_normalization;
temp_env.reward_conditioning = reward_conditioning;
temp_env.agents = shared->template_agents;
temp_env.num_objects = shared->num_objects;
temp_env.sdc_track_index = shared->sdc_track_index;
temp_env.num_tracks_to_predict = shared->num_tracks_to_predict;
temp_env.tracks_to_predict_indices = shared->tracks_to_predict_indices;
set_active_agents(&temp_env);
int active_count = temp_env.active_agent_count;

// Free the index arrays that set_active_agents allocated
free(temp_env.active_agent_indices);
free(temp_env.static_agent_indices);
free(temp_env.expert_static_agent_indices);

// Skip map if it doesn't contain any controllable agents
if (env->active_agent_count == 0) {
if (active_count == 0) {
maps_checked++;

// Safeguard: if we've checked all available maps and found no active agents, raise an error
if (maps_checked >= num_maps) {
for (int j = 0; j < env->num_objects; j++) {
free_agent(&env->agents[j]);
}
for (int j = 0; j < env->num_roads; j++) {
free_road_element(&env->road_elements[j]);
}
free(env->agents);
free(env->road_elements);
free(env->road_scenario_ids);
free(env->active_agent_indices);
free(env->static_agent_indices);
free(env->expert_static_agent_indices);
free(env);
Py_DECREF(agent_offsets);
Py_DECREF(map_ids);
char error_msg[256];
sprintf(error_msg, "No controllable agents found in any of the %d available maps", num_maps);
PyErr_SetString(PyExc_ValueError, error_msg);
return NULL;
}

// Store map_id
PyObject *map_id_obj = PyLong_FromLong(map_id);
PyList_SetItem(map_ids, env_count, map_id_obj);
// Store agent offset
PyObject *offset = PyLong_FromLong(total_agent_count);
PyList_SetItem(agent_offsets, env_count, offset);
total_agent_count += env->active_agent_count;
env_count++;
for (int j = 0; j < env->num_objects; j++) {
free_agent(&env->agents[j]);
}
for (int j = 0; j < env->num_roads; j++) {
free_road_element(&env->road_elements[j]);
}
free(env->agents);
free(env->road_elements);
free(env->road_scenario_ids);
free(env->active_agent_indices);
free(env->static_agent_indices);
free(env->expert_static_agent_indices);
free(env);
continue;
}

// Store map_id and agent offset
PyList_SetItem(map_ids, env_count, PyLong_FromLong(map_id));
PyList_SetItem(agent_offsets, env_count, PyLong_FromLong(total_agent_count));
total_agent_count += active_count;
env_count++;
}

if (total_agent_count >= num_agents) {
Expand Down Expand Up @@ -408,6 +486,23 @@ static int my_init(Env *env, PyObject *args, PyObject *kwargs) {
env->map_name = map_path;
env->init_steps = init_steps;
env->timestep = init_steps;

// Check if map_id was provided and the map cache is populated
PyObject *map_id_obj = kwargs ? PyDict_GetItemString(kwargs, "map_id") : NULL;
if (map_id_obj != NULL && g_map_cache != NULL) {
int map_id = (int)PyLong_AsLong(map_id_obj);
if (map_id >= 0 && map_id < g_map_cache_size && g_map_cache[map_id] != NULL) {
init_from_shared(env, g_map_cache[map_id]);
return 0;
}
}

// Fallback: load from disk (standalone use, tests, rendering, etc.)
// If map_id was provided but cache miss occurred, warn — this likely indicates a bug.
if (map_id_obj != NULL) {
fprintf(stderr, "WARNING: map_id=%d provided but cache miss — falling back to disk loading\n",
(int)PyLong_AsLong(map_id_obj));
}
init(env);
return 0;
}
Expand Down
Loading
Loading