diff --git a/c_src/py_nif.c b/c_src/py_nif.c index 8d88ddc..8f3c35e 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -63,8 +63,94 @@ ErlNifResourceType *PY_REF_RESOURCE_TYPE = NULL; /* suspended_context_state_t resource type (context suspension for callbacks) */ ErlNifResourceType *PY_CONTEXT_SUSPENDED_RESOURCE_TYPE = NULL; +/* Process-local Python environment resource type */ +ErlNifResourceType *PY_ENV_RESOURCE_TYPE = NULL; + _Atomic uint32_t g_context_id_counter = 1; +/* ============================================================================ + * Process-local Python Environment + * ============================================================================ + * Each Erlang process can have its own Python globals/locals dict via a NIF + * resource stored in the process dictionary. When the process exits, the + * resource destructor frees the Python dicts. + */ + +/** + * @struct py_env_resource_t + * @brief Process-local Python environment (globals/locals) + * + * Stored in process dictionary as py_local_env. When the process exits, + * Erlang GC drops the reference, triggering the destructor which frees + * the Python dicts. + * + * Each env is bound to a specific interpreter (identified by interp_id). + * The dicts must be freed in the same interpreter that created them. + */ +typedef struct { + /** @brief Global namespace dictionary */ + PyObject *globals; + /** @brief Local namespace dictionary (same as globals for module-level execution) */ + PyObject *locals; + /** @brief Interpreter ID that owns these dicts (0 = main interpreter) */ + int64_t interp_id; + /** @brief Pool slot index (-1 for main interpreter) */ + int pool_slot; +} py_env_resource_t; + +/** + * @brief Destructor for py_env_resource_t + * + * Called when the resource reference is garbage collected (process exits). + * Acquires GIL and decrefs the Python dicts. + * + * For subinterpreters, we must DECREF in the correct interpreter context. + * If the interpreter was destroyed (context freed), we skip DECREF since + * the objects were already freed with the interpreter. + */ +static void py_env_resource_dtor(ErlNifEnv *env, void *obj) { + (void)env; + py_env_resource_t *res = (py_env_resource_t *)obj; + + if (!runtime_is_running()) { + res->globals = NULL; + res->locals = NULL; + return; + } + + PyGILState_STATE gstate = PyGILState_Ensure(); + +#ifdef HAVE_SUBINTERPRETERS + if (res->pool_slot >= 0) { + /* Created in a subinterpreter - must DECREF in correct interpreter */ + subinterp_slot_t *slot = subinterp_pool_get(res->pool_slot); + + /* Verify slot is still valid and has same interpreter */ + if (slot != NULL && slot->initialized && slot->interp != NULL) { + int64_t slot_interp_id = PyInterpreterState_GetID(slot->interp); + if (slot_interp_id == res->interp_id) { + /* Same interpreter, safe to DECREF */ + PyThreadState *saved = PyThreadState_Swap(slot->tstate); + Py_XDECREF(res->globals); + Py_XDECREF(res->locals); + PyThreadState_Swap(saved); + } + /* If interp_id mismatch, slot was reused - skip DECREF */ + } + /* If slot invalid/not initialized, interpreter destroyed - skip DECREF */ + } else +#endif + { + /* Main interpreter */ + Py_XDECREF(res->globals); + Py_XDECREF(res->locals); + } + + PyGILState_Release(gstate); + res->globals = NULL; + res->locals = NULL; +} + /* Invariant counters for debugging and leak detection */ py_invariant_counters_t g_counters = {0}; @@ -2525,6 +2611,478 @@ static ERL_NIF_TERM nif_context_exec(ErlNifEnv *env, int argc, const ERL_NIF_TER return result; } +/* ============================================================================ + * Process-local Environment NIFs + * ============================================================================ */ + +/* Thread-local variable to track current local env during reentrant calls */ +__thread py_env_resource_t *tl_current_local_env = NULL; + +/** + * @brief Create a new process-local Python environment + * + * nif_create_local_env(ContextRef) -> {ok, EnvRef} | {error, Reason} + * + * Creates a new Python globals/locals dict pair for use as a process-local + * environment. The dicts are created inside the context's interpreter to + * ensure correct memory allocator is used. + * + * The returned resource should be stored in the process dictionary, keyed + * by the interpreter ID. + */ +static ERL_NIF_TERM nif_create_local_env(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + (void)argc; + py_context_t *ctx; + + if (!runtime_is_running()) { + return make_error(env, "python_not_running"); + } + + if (!enif_get_resource(env, argv[0], PY_CONTEXT_RESOURCE_TYPE, (void **)&ctx)) { + return make_error(env, "invalid_context"); + } + + py_env_resource_t *res = enif_alloc_resource(PY_ENV_RESOURCE_TYPE, + sizeof(py_env_resource_t)); + if (res == NULL) { + return make_error(env, "alloc_failed"); + } + + res->globals = NULL; + res->locals = NULL; + res->interp_id = 0; + res->pool_slot = -1; + + /* Acquire context to switch to correct interpreter */ + py_context_guard_t guard = py_context_acquire(ctx); + if (!guard.acquired) { + enif_release_resource(res); + return make_error(env, "acquire_failed"); + } + + /* Store interpreter info for destructor */ +#ifdef HAVE_SUBINTERPRETERS + if (ctx->is_subinterp && ctx->pool_slot >= 0) { + res->pool_slot = ctx->pool_slot; + PyInterpreterState *interp = PyInterpreterState_Get(); + res->interp_id = PyInterpreterState_GetID(interp); + } +#endif + + /* Create globals dict with builtins and erlang module */ + res->globals = PyDict_New(); + if (res->globals == NULL) { + py_context_release(&guard); + enif_release_resource(res); + return make_error(env, "globals_failed"); + } + + /* Add __builtins__ */ + PyObject *builtins = PyEval_GetBuiltins(); + if (builtins != NULL) { + PyDict_SetItemString(res->globals, "__builtins__", builtins); + } + + /* Add __name__ = '__main__' so defined functions are accessible via __main__ */ + PyObject *main_name = PyUnicode_FromString("__main__"); + if (main_name != NULL) { + PyDict_SetItemString(res->globals, "__name__", main_name); + Py_DECREF(main_name); + } + + /* Add erlang module */ + PyObject *erlang = PyImport_ImportModule("erlang"); + if (erlang != NULL) { + PyDict_SetItemString(res->globals, "erlang", erlang); + Py_DECREF(erlang); + } + + /* Use the same dict for locals (module-level execution) */ + res->locals = res->globals; + Py_INCREF(res->locals); + + py_context_release(&guard); + + ERL_NIF_TERM ref = enif_make_resource(env, res); + enif_release_resource(res); /* Ref now owns it */ + + return enif_make_tuple2(env, ATOM_OK, ref); +} + +/** + * @brief Execute Python statements using a process-local environment + * + * nif_context_exec_with_env(ContextRef, Code, EnvRef) -> ok | {error, Reason} + * + * In worker mode, uses the process-local environment's globals/locals. + * In subinterpreter mode, the EnvRef is ignored (each subinterp is isolated). + * + * The tl_current_local_env thread-local is set during execution to support + * reentrant calls - when Python calls erlang.call() which calls back to Python, + * the same environment is used. + */ +static ERL_NIF_TERM nif_context_exec_with_env(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + (void)argc; + py_context_t *ctx; + py_env_resource_t *penv; + + if (!runtime_is_running()) { + return make_error(env, "python_not_running"); + } + + if (!enif_get_resource(env, argv[0], PY_CONTEXT_RESOURCE_TYPE, (void **)&ctx)) { + return make_error(env, "invalid_context"); + } + + ErlNifBinary code_bin; + if (!enif_inspect_binary(env, argv[1], &code_bin)) { + return make_error(env, "invalid_code"); + } + + /* Get process-local environment */ + if (!enif_get_resource(env, argv[2], PY_ENV_RESOURCE_TYPE, (void **)&penv)) { + return make_error(env, "invalid_env"); + } + + char *code = binary_to_string(&code_bin); + if (code == NULL) { + return make_error(env, "alloc_failed"); + } + + ERL_NIF_TERM result; + + /* Acquire thread state */ + py_context_guard_t guard = py_context_acquire(ctx); + if (!guard.acquired) { + enif_free(code); + return make_error(env, "acquire_failed"); + } + + /* Set thread-local context and env for callback/reentrant support */ + py_context_t *prev_context = tl_current_context; + tl_current_context = ctx; + py_env_resource_t *prev_local_env = tl_current_local_env; + tl_current_local_env = penv; + + /* Always use process-local environment */ + PyObject *exec_globals = penv->globals; + PyObject *exec_locals = penv->globals; + + /* Execute statements */ + PyObject *py_result = PyRun_String(code, Py_file_input, exec_globals, exec_locals); + + if (py_result == NULL) { + result = make_py_error(env); + } else { + Py_DECREF(py_result); + result = ATOM_OK; + } + + /* Restore thread-local state */ + tl_current_context = prev_context; + tl_current_local_env = prev_local_env; + + enif_free(code); + py_context_release(&guard); + + return result; +} + +/** + * @brief Evaluate a Python expression using a process-local environment + * + * nif_context_eval_with_env(ContextRef, Code, Locals, EnvRef) -> {ok, Result} | {error, Reason} + * + * In worker mode, uses the process-local environment's globals/locals. + * In subinterpreter mode, the EnvRef is ignored (each subinterp is isolated). + */ +static ERL_NIF_TERM nif_context_eval_with_env(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + (void)argc; + py_context_t *ctx; + py_env_resource_t *penv; + + if (!runtime_is_running()) { + return make_error(env, "python_not_running"); + } + + if (!enif_get_resource(env, argv[0], PY_CONTEXT_RESOURCE_TYPE, (void **)&ctx)) { + return make_error(env, "invalid_context"); + } + + ErlNifBinary code_bin; + if (!enif_inspect_binary(env, argv[1], &code_bin)) { + return make_error(env, "invalid_code"); + } + + /* Get process-local environment (argv[3]) */ + if (!enif_get_resource(env, argv[3], PY_ENV_RESOURCE_TYPE, (void **)&penv)) { + return make_error(env, "invalid_env"); + } + + char *code = binary_to_string(&code_bin); + if (code == NULL) { + return make_error(env, "alloc_failed"); + } + + ERL_NIF_TERM result; + + /* Acquire thread state */ + py_context_guard_t guard = py_context_acquire(ctx); + if (!guard.acquired) { + enif_free(code); + return make_error(env, "acquire_failed"); + } + + /* Set thread-local context and env for callback/reentrant support */ + py_context_t *prev_context = tl_current_context; + tl_current_context = ctx; + py_env_resource_t *prev_local_env = tl_current_local_env; + tl_current_local_env = penv; + + /* Enable suspension for callback support */ + bool prev_allow_suspension = tl_allow_suspension; + tl_allow_suspension = true; + + /* Always use process-local environment */ + PyObject *eval_globals = penv->globals; + + /* Build locals dict from Erlang map (if provided) */ + PyObject *eval_locals = PyDict_Copy(eval_globals); + if (enif_is_map(env, argv[2])) { + ErlNifMapIterator iter; + ERL_NIF_TERM key, value; + + enif_map_iterator_create(env, argv[2], &iter, ERL_NIF_MAP_ITERATOR_FIRST); + while (enif_map_iterator_get_pair(env, &iter, &key, &value)) { + PyObject *py_key = term_to_py(env, key); + PyObject *py_value = term_to_py(env, value); + if (py_key != NULL && py_value != NULL) { + PyDict_SetItem(eval_locals, py_key, py_value); + } + Py_XDECREF(py_key); + Py_XDECREF(py_value); + enif_map_iterator_next(env, &iter); + } + enif_map_iterator_destroy(env, &iter); + } + + /* Evaluate expression */ + PyObject *py_result = PyRun_String(code, Py_eval_input, eval_globals, eval_locals); + Py_DECREF(eval_locals); + + if (py_result == NULL) { + /* Check for pending callback (flag-based detection) */ + if (tl_pending_callback) { + PyErr_Clear(); + /* Create suspended state for callback handling */ + suspended_context_state_t *suspended = create_suspended_context_state_for_eval( + env, ctx, &code_bin, argv[2]); + if (suspended == NULL) { + tl_pending_callback = false; + Py_CLEAR(tl_pending_args); + result = make_error(env, "create_suspended_state_failed"); + } else { + result = build_suspended_context_result(env, suspended); + } + } else { + result = make_py_error(env); + } + } else if (is_schedule_marker(py_result)) { + /* Schedule marker: release dirty scheduler, continue via callback */ + ScheduleMarkerObject *marker = (ScheduleMarkerObject *)py_result; + ERL_NIF_TERM callback_name = py_to_term(env, marker->callback_name); + ERL_NIF_TERM callback_args = py_to_term(env, marker->args); + Py_DECREF(py_result); + result = enif_make_tuple3(env, ATOM_SCHEDULE, callback_name, callback_args); + } else { + ERL_NIF_TERM term_result = py_to_term(env, py_result); + Py_DECREF(py_result); + result = enif_make_tuple2(env, ATOM_OK, term_result); + } + + /* Restore thread-local state */ + tl_allow_suspension = prev_allow_suspension; + tl_current_context = prev_context; + tl_current_local_env = prev_local_env; + + clear_pending_callback_tls(); + enif_free(code); + py_context_release(&guard); + + return result; +} + +/** + * @brief Call a Python function using a process-local environment + * + * nif_context_call_with_env(ContextRef, Module, Func, Args, Kwargs, EnvRef) -> {ok, Result} | {error, Reason} + * + * In worker mode, uses the process-local environment's globals for module lookup. + * In subinterpreter mode, the EnvRef is ignored (each subinterp is isolated). + * + * For __main__ module, functions defined via exec() in the process-local env + * are accessible. + */ +static ERL_NIF_TERM nif_context_call_with_env(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + py_context_t *ctx; + py_env_resource_t *penv; + + if (!runtime_is_running()) { + return make_error(env, "python_not_running"); + } + + if (!enif_get_resource(env, argv[0], PY_CONTEXT_RESOURCE_TYPE, (void **)&ctx)) { + return make_error(env, "invalid_context"); + } + + ErlNifBinary module_bin, func_bin; + if (!enif_inspect_binary(env, argv[1], &module_bin)) { + return make_error(env, "invalid_module"); + } + if (!enif_inspect_binary(env, argv[2], &func_bin)) { + return make_error(env, "invalid_func"); + } + + /* Get process-local environment (argv[5]) */ + if (!enif_get_resource(env, argv[5], PY_ENV_RESOURCE_TYPE, (void **)&penv)) { + return make_error(env, "invalid_env"); + } + + char *module_name = binary_to_string(&module_bin); + char *func_name = binary_to_string(&func_bin); + if (module_name == NULL || func_name == NULL) { + enif_free(module_name); + enif_free(func_name); + return make_error(env, "alloc_failed"); + } + + ERL_NIF_TERM result; + + /* Acquire thread state */ + py_context_guard_t guard = py_context_acquire(ctx); + if (!guard.acquired) { + enif_free(module_name); + enif_free(func_name); + return make_error(env, "acquire_failed"); + } + + /* Set thread-local context and env for callback/reentrant support */ + py_context_t *prev_context = tl_current_context; + tl_current_context = ctx; + py_env_resource_t *prev_local_env = tl_current_local_env; + tl_current_local_env = penv; + + /* Enable suspension for callback support */ + bool prev_allow_suspension = tl_allow_suspension; + tl_allow_suspension = true; + + /* Always use process-local environment */ + PyObject *lookup_globals = penv->globals; + + PyObject *module = NULL; + PyObject *func = NULL; + + /* Special handling for __main__ module - look up in process-local globals */ + if (strcmp(module_name, "__main__") == 0) { + func = PyDict_GetItemString(lookup_globals, func_name); /* Borrowed ref */ + if (func != NULL) { + Py_INCREF(func); + } + } + + if (func == NULL) { + /* Get or import module from context cache */ + module = context_get_module(ctx, module_name); + if (module == NULL) { + result = make_py_error(env); + goto cleanup; + } + + /* Get function */ + func = PyObject_GetAttrString(module, func_name); + if (func == NULL) { + result = make_py_error(env); + goto cleanup; + } + } + + /* Convert args */ + unsigned int args_len; + if (!enif_get_list_length(env, argv[3], &args_len)) { + Py_DECREF(func); + result = make_error(env, "invalid_args"); + goto cleanup; + } + + PyObject *args = PyTuple_New(args_len); + ERL_NIF_TERM head, tail = argv[3]; + for (unsigned int i = 0; i < args_len; i++) { + enif_get_list_cell(env, tail, &head, &tail); + PyObject *arg = term_to_py(env, head); + if (arg == NULL) { + Py_DECREF(args); + Py_DECREF(func); + result = make_error(env, "arg_conversion_failed"); + goto cleanup; + } + PyTuple_SET_ITEM(args, i, arg); + } + + /* Convert kwargs */ + PyObject *kwargs = NULL; + if (argc > 4 && enif_is_map(env, argv[4])) { + kwargs = term_to_py(env, argv[4]); + } + + /* Call the function */ + PyObject *py_result = PyObject_Call(func, args, kwargs); + Py_DECREF(func); + Py_DECREF(args); + Py_XDECREF(kwargs); + + if (py_result == NULL) { + /* Check for pending callback */ + if (tl_pending_callback) { + PyErr_Clear(); + suspended_context_state_t *suspended = create_suspended_context_state_for_call( + env, ctx, &module_bin, &func_bin, argv[3], + argc > 4 ? argv[4] : enif_make_new_map(env)); + if (suspended == NULL) { + tl_pending_callback = false; + Py_CLEAR(tl_pending_args); + result = make_error(env, "create_suspended_state_failed"); + } else { + result = build_suspended_context_result(env, suspended); + } + } else { + result = make_py_error(env); + } + } else if (is_schedule_marker(py_result)) { + ScheduleMarkerObject *marker = (ScheduleMarkerObject *)py_result; + ERL_NIF_TERM callback_name = py_to_term(env, marker->callback_name); + ERL_NIF_TERM callback_args = py_to_term(env, marker->args); + Py_DECREF(py_result); + result = enif_make_tuple3(env, ATOM_SCHEDULE, callback_name, callback_args); + } else { + ERL_NIF_TERM term_result = py_to_term(env, py_result); + Py_DECREF(py_result); + result = enif_make_tuple2(env, ATOM_OK, term_result); + } + +cleanup: + /* Restore thread-local state */ + tl_allow_suspension = prev_allow_suspension; + tl_current_context = prev_context; + tl_current_local_env = prev_local_env; + + clear_pending_callback_tls(); + enif_free(module_name); + enif_free(func_name); + py_context_release(&guard); + + return result; +} + /** * @brief Call a method on a Python object in a context * @@ -3661,10 +4219,16 @@ static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) { env, NULL, "py_context_suspended", suspended_context_state_destructor, ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL); + /* Process-local environment resource type */ + PY_ENV_RESOURCE_TYPE = enif_open_resource_type( + env, NULL, "py_env", py_env_resource_dtor, + ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL); + if (WORKER_RESOURCE_TYPE == NULL || PYOBJ_RESOURCE_TYPE == NULL || SUSPENDED_STATE_RESOURCE_TYPE == NULL || PY_CONTEXT_RESOURCE_TYPE == NULL || PY_REF_RESOURCE_TYPE == NULL || - PY_CONTEXT_SUSPENDED_RESOURCE_TYPE == NULL) { + PY_CONTEXT_SUSPENDED_RESOURCE_TYPE == NULL || + PY_ENV_RESOURCE_TYPE == NULL) { return -1; } #ifdef HAVE_SUBINTERPRETERS @@ -3945,6 +4509,10 @@ static ErlNifFunc nif_funcs[] = { {"context_call", 5, nif_context_call, ERL_NIF_DIRTY_JOB_CPU_BOUND}, {"context_eval", 3, nif_context_eval, ERL_NIF_DIRTY_JOB_CPU_BOUND}, {"context_exec", 2, nif_context_exec, ERL_NIF_DIRTY_JOB_CPU_BOUND}, + {"context_exec", 3, nif_context_exec_with_env, ERL_NIF_DIRTY_JOB_CPU_BOUND}, + {"context_eval", 4, nif_context_eval_with_env, ERL_NIF_DIRTY_JOB_CPU_BOUND}, + {"context_call", 6, nif_context_call_with_env, ERL_NIF_DIRTY_JOB_CPU_BOUND}, + {"create_local_env", 1, nif_create_local_env, 0}, {"context_call_method", 4, nif_context_call_method, ERL_NIF_DIRTY_JOB_CPU_BOUND}, {"context_to_term", 1, nif_context_to_term, 0}, {"context_interp_id", 1, nif_context_interp_id, 0}, diff --git a/docker/Dockerfile.asan b/docker/Dockerfile.asan new file mode 100644 index 0000000..34550c3 --- /dev/null +++ b/docker/Dockerfile.asan @@ -0,0 +1,75 @@ +# Dockerfile for testing with Python ASAN (Address Sanitizer) +FROM erlang:27-slim + +# Install build dependencies +RUN apt-get update && apt-get install -y \ + build-essential \ + cmake \ + git \ + curl \ + wget \ + libssl-dev \ + zlib1g-dev \ + libbz2-dev \ + libreadline-dev \ + libsqlite3-dev \ + libncurses5-dev \ + libncursesw5-dev \ + xz-utils \ + tk-dev \ + libffi-dev \ + liblzma-dev \ + clang \ + llvm \ + && rm -rf /var/lib/apt/lists/* + +# Build Python 3.13 with ASAN enabled +ARG PYTHON_VERSION=3.13.12 +RUN cd /tmp && \ + wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-${PYTHON_VERSION}.tar.xz && \ + tar xf Python-${PYTHON_VERSION}.tar.xz && \ + cd Python-${PYTHON_VERSION} && \ + CC=clang CXX=clang++ \ + CFLAGS="-fsanitize=address -fno-omit-frame-pointer -g" \ + LDFLAGS="-fsanitize=address" \ + ./configure --prefix=/usr/local \ + --with-pydebug \ + --with-address-sanitizer \ + --enable-shared \ + LDFLAGS="-Wl,-rpath,/usr/local/lib -fsanitize=address" && \ + make -j$(nproc) && \ + make install && \ + cd / && rm -rf /tmp/Python-* + +# Verify Python installation +RUN python3 --version + +# Set environment for rebar3 and ASAN +ENV PYTHON_CONFIG=/usr/local/bin/python3-config +ENV LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH +ENV ASAN_OPTIONS=detect_leaks=0:abort_on_error=1:print_stats=1 + +# Set compiler to clang with ASAN for NIF compilation via CMake variables +ENV CC=clang +ENV CXX=clang++ +ENV CMAKE_C_FLAGS="-fsanitize=address -fno-omit-frame-pointer -g -O1" +ENV CMAKE_CXX_FLAGS="-fsanitize=address -fno-omit-frame-pointer -g -O1" +ENV CMAKE_EXE_LINKER_FLAGS="-fsanitize=address" +ENV CMAKE_SHARED_LINKER_FLAGS="-fsanitize=address" + +# Create working directory +WORKDIR /app + +# Copy source +COPY . /app + +# Remove problematic plugin +RUN sed -i '/rebar3_ex_doc/d' rebar.config || true + +# Set ASAN_BUILD for the build hooks +ENV ASAN_BUILD=1 + +# Run tests - do_cmake.sh will use ASAN flags via ASAN_BUILD env var +# Preload ASAN runtime to ensure symbol compatibility with ASAN-built Python +CMD export LD_PRELOAD=$(clang -print-file-name=libclang_rt.asan-x86_64.so) && \ + rm -rf _build && rebar3 ct --readable=compact diff --git a/docker/Dockerfile.python312 b/docker/Dockerfile.python312 new file mode 100644 index 0000000..4be960c --- /dev/null +++ b/docker/Dockerfile.python312 @@ -0,0 +1,83 @@ +# Dockerfile for testing with Python 3.12 (subinterpreter support) +# Supports ASAN builds when ASAN_BUILD=1 +FROM erlang:27-slim + +# Prevent tzdata from asking for input +ENV DEBIAN_FRONTEND=noninteractive + +# Build arg for ASAN +ARG ASAN_BUILD=0 + +# Install build dependencies +RUN apt-get update && apt-get install -y \ + build-essential \ + cmake \ + git \ + curl \ + wget \ + libssl-dev \ + zlib1g-dev \ + libbz2-dev \ + libreadline-dev \ + libsqlite3-dev \ + libncurses5-dev \ + libncursesw5-dev \ + xz-utils \ + tk-dev \ + libffi-dev \ + liblzma-dev \ + clang \ + llvm \ + && rm -rf /var/lib/apt/lists/* + +# Build Python 3.12 from source (with ASAN if requested) +ARG PYTHON_VERSION=3.12.8 +RUN cd /tmp && \ + wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-${PYTHON_VERSION}.tar.xz && \ + tar xf Python-${PYTHON_VERSION}.tar.xz && \ + cd Python-${PYTHON_VERSION} && \ + if [ "$ASAN_BUILD" = "1" ]; then \ + CC=clang CXX=clang++ \ + CFLAGS="-fsanitize=address -fno-omit-frame-pointer -g" \ + LDFLAGS="-fsanitize=address" \ + ./configure --prefix=/usr/local \ + --with-pydebug \ + --with-address-sanitizer \ + --enable-shared \ + LDFLAGS="-Wl,-rpath,/usr/local/lib -fsanitize=address"; \ + else \ + ./configure --prefix=/usr/local \ + --enable-optimizations \ + --with-lto \ + --enable-shared \ + LDFLAGS="-Wl,-rpath,/usr/local/lib"; \ + fi && \ + make -j$(nproc) && \ + make install && \ + cd / && rm -rf /tmp/Python-* + +# Verify Python installation +RUN python3 --version && \ + python3 -c "import sys; print('Python version:', sys.version)" + +# Set environment for rebar3 +ENV PYTHON_CONFIG=/usr/local/bin/python3-config +ENV LD_LIBRARY_PATH=/usr/local/lib:${LD_LIBRARY_PATH:-} + +# Set ASAN options +ENV ASAN_OPTIONS=detect_leaks=0:abort_on_error=1:print_stats=1 + +# Create working directory +WORKDIR /app + +# Copy source +COPY . /app + +# Remove problematic plugin +RUN sed -i '/rebar3_ex_doc/d' rebar.config || true + +# Persist ASAN_BUILD from build arg to env for runtime +ENV ASAN_BUILD=${ASAN_BUILD} + +# Run tests - do_cmake.sh will use ASAN flags via ASAN_BUILD env var +CMD rm -rf _build && rebar3 ct --readable=compact diff --git a/docker/Dockerfile.python314 b/docker/Dockerfile.python314 new file mode 100644 index 0000000..c5cf1af --- /dev/null +++ b/docker/Dockerfile.python314 @@ -0,0 +1,60 @@ +# Dockerfile for testing with Python 3.14 (free-threading support) +FROM erlang:27-slim + +# Install build dependencies +RUN apt-get update && apt-get install -y \ + build-essential \ + cmake \ + git \ + curl \ + wget \ + libssl-dev \ + zlib1g-dev \ + libbz2-dev \ + libreadline-dev \ + libsqlite3-dev \ + libncurses5-dev \ + libncursesw5-dev \ + xz-utils \ + tk-dev \ + libffi-dev \ + liblzma-dev \ + && rm -rf /var/lib/apt/lists/* + +# Build Python 3.14 with free-threading enabled +ARG PYTHON_VERSION=3.14.3 +RUN cd /tmp && \ + wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-${PYTHON_VERSION}.tar.xz && \ + tar xf Python-${PYTHON_VERSION}.tar.xz && \ + cd Python-${PYTHON_VERSION} && \ + ./configure --prefix=/usr/local \ + --enable-optimizations \ + --with-lto \ + --disable-gil \ + --enable-shared \ + LDFLAGS="-Wl,-rpath,/usr/local/lib" && \ + make -j$(nproc) && \ + make install && \ + cd / && rm -rf /tmp/Python-* + +# Verify Python installation +RUN python3 --version && \ + python3 -c "import sys; print('Free-threading:', hasattr(sys, '_is_gil_enabled') and not sys._is_gil_enabled())" + +# Set environment for rebar3 +ENV PYTHON_CONFIG=/usr/local/bin/python3-config +ENV LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH + +# Create working directory +WORKDIR /app + +# Copy source +COPY . /app + +# Remove problematic plugin and build +RUN sed -i '/rebar3_ex_doc/d' rebar.config || true && \ + rm -rf _build && \ + rebar3 compile + +# Run tests +CMD ["rebar3", "ct", "--readable=compact"] diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000..e868d0d --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,31 @@ +services: + # Python 3.14 with free-threading (no GIL) + python314: + build: + context: .. + dockerfile: docker/Dockerfile.python314 + environment: + - PYTHON_CONFIG=/usr/local/bin/python3-config + command: rebar3 ct --readable=compact + + # Python 3.13 with Address Sanitizer + asan: + build: + context: .. + dockerfile: docker/Dockerfile.asan + environment: + - PYTHON_CONFIG=/usr/local/bin/python3-config + - ASAN_OPTIONS=detect_leaks=0:abort_on_error=1:print_stats=1 + - ASAN_BUILD=1 + + # Python 3.12 with Address Sanitizer + asan312: + build: + context: .. + dockerfile: docker/Dockerfile.python312 + args: + - ASAN_BUILD=1 + environment: + - PYTHON_CONFIG=/usr/local/bin/python3-config + - ASAN_OPTIONS=detect_leaks=0:abort_on_error=1:print_stats=1 + - ASAN_BUILD=1 diff --git a/docker/run-tests.sh b/docker/run-tests.sh new file mode 100755 index 0000000..5a12243 --- /dev/null +++ b/docker/run-tests.sh @@ -0,0 +1,61 @@ +#!/bin/bash +# Run tests in Docker with different Python configurations + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_DIR="$(dirname "$SCRIPT_DIR")" + +cd "$SCRIPT_DIR" + +usage() { + echo "Usage: $0 [python314|asan|all]" + echo "" + echo "Options:" + echo " python314 - Run tests with Python 3.14 (free-threading)" + echo " asan - Run tests with Python + Address Sanitizer" + echo " all - Run both test configurations" + echo "" + echo "Examples:" + echo " $0 python314" + echo " $0 asan" + echo " $0 all" +} + +build_and_run() { + local target=$1 + echo "========================================" + echo "Building and running: $target" + echo "========================================" + + docker compose build "$target" + docker compose run --rm "$target" +} + +case "${1:-all}" in + python314) + build_and_run python314 + ;; + asan) + build_and_run asan + ;; + all) + build_and_run python314 + echo "" + build_and_run asan + ;; + -h|--help) + usage + exit 0 + ;; + *) + echo "Error: Unknown option '$1'" + usage + exit 1 + ;; +esac + +echo "" +echo "========================================" +echo "All tests completed successfully!" +echo "========================================" diff --git a/examples/bench_async_task.erl b/examples/bench_async_task.erl new file mode 100644 index 0000000..3a87cd2 --- /dev/null +++ b/examples/bench_async_task.erl @@ -0,0 +1,197 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! -pa _build/default/lib/erlang_python/ebin + +%%% @doc Benchmark script for async task API performance. +%%% +%%% Tests the new py_event_loop async task API: +%%% - py_event_loop:run/3,4 (blocking) +%%% - py_event_loop:create_task/3,4 + await (non-blocking) +%%% - py_event_loop:spawn/3,4 (fire-and-forget) +%%% +%%% Run with: +%%% rebar3 compile && escript examples/bench_async_task.erl + +-mode(compile). + +main(Args) -> + Opts = parse_args(Args), + + case maps:get(help, Opts, false) of + true -> + print_help(), + halt(0); + false -> + ok + end, + + io:format("~n=== Async Task API Benchmark ===~n~n"), + + {ok, _} = application:ensure_all_started(erlang_python), + {ok, _} = py:start_contexts(), + + %% Give event loop time to initialize + timer:sleep(500), + + %% Verify event loop is ready + case py_event_loop:get_loop() of + {ok, LoopRef} -> + io:format("Event loop initialized: ~p~n", [LoopRef]); + {error, Reason} -> + io:format("Event loop failed: ~p~n", [Reason]), + halt(1) + end, + + print_system_info(), + setup_python(), + + Mode = maps:get(mode, Opts, standard), + run_benchmarks(Mode), + + io:format("~n=== Benchmark Complete ===~n"), + halt(0). + +parse_args(Args) -> + parse_args(Args, #{mode => standard}). + +parse_args([], Acc) -> Acc; +parse_args(["--quick" | Rest], Acc) -> + parse_args(Rest, Acc#{mode => quick}); +parse_args(["--full" | Rest], Acc) -> + parse_args(Rest, Acc#{mode => full}); +parse_args(["--help" | _], Acc) -> + Acc#{help => true}; +parse_args([_ | Rest], Acc) -> + parse_args(Rest, Acc). + +print_help() -> + io:format("Usage: escript examples/bench_async_task.erl [OPTIONS]~n~n"), + io:format("Options:~n"), + io:format(" --quick Run quick benchmark (fewer iterations)~n"), + io:format(" --full Run full benchmark (more iterations)~n"), + io:format(" --help Show this help~n"). + +print_system_info() -> + io:format("System Information:~n"), + io:format(" Erlang/OTP: ~s~n", [erlang:system_info(otp_release)]), + io:format(" Schedulers: ~p~n", [erlang:system_info(schedulers)]), + io:format(" Dirty Schedulers: ~p~n", [erlang:system_info(dirty_cpu_schedulers)]), + {ok, PyVer} = py:version(), + io:format(" Python: ~s~n", [PyVer]), + io:format("~n"). + +setup_python() -> + io:format("Python stdlib ready.~n"), + + %% Smoke test with math.sqrt (stdlib function) + %% Note: Functions defined via py:exec run in a different context + %% than process_ready_tasks, so we use stdlib functions instead. + io:format("Smoke test (math.sqrt via create_task/await): "), + Ref = py_event_loop:create_task(math, sqrt, [16.0]), + case py_event_loop:await(Ref, 2000) of + {ok, 4.0} -> + io:format("PASS~n~n"); + Other -> + io:format("FAIL: ~p~n", [Other]), + halt(1) + end. + +run_benchmarks(quick) -> + io:format("Running quick benchmarks...~n~n"), + bench_baseline(100), + bench_create_task_await(100), + bench_concurrent_tasks(10, 10), + bench_spawn_fire_forget(100); + +run_benchmarks(full) -> + io:format("Running full benchmarks...~n~n"), + bench_baseline(5000), + bench_create_task_await(5000), + bench_concurrent_tasks(50, 100), + bench_concurrent_tasks(100, 50), + bench_spawn_fire_forget(1000); + +run_benchmarks(standard) -> + io:format("Running standard benchmarks...~n~n"), + bench_baseline(1000), + bench_create_task_await(1000), + bench_concurrent_tasks(20, 50), + bench_spawn_fire_forget(500). + +%% Baseline: sync py:call for comparison +bench_baseline(N) -> + io:format("Benchmark: Baseline py:call (sync)~n"), + io:format(" Iterations: ~p~n", [N]), + + {Time, _} = timer:tc(fun() -> + lists:foreach(fun(I) -> + {ok, _} = py:call(math, sqrt, [I]) + end, lists:seq(1, N)) + end), + + print_results(Time, N, "calls"). + +%% Create task + await (non-blocking submission) +bench_create_task_await(N) -> + io:format("Benchmark: create_task + await (math.sqrt)~n"), + io:format(" Iterations: ~p~n", [N]), + + {Time, _} = timer:tc(fun() -> + lists:foreach(fun(I) -> + Ref = py_event_loop:create_task(math, sqrt, [float(I)]), + {ok, _} = py_event_loop:await(Ref) + end, lists:seq(1, N)) + end), + + print_results(Time, N, "tasks"). + +%% Concurrent tasks from multiple processes +bench_concurrent_tasks(NumProcs, TasksPerProc) -> + TotalTasks = NumProcs * TasksPerProc, + io:format("Benchmark: Concurrent tasks (math.sqrt)~n"), + io:format(" Processes: ~p, Tasks/process: ~p, Total: ~p~n", + [NumProcs, TasksPerProc, TotalTasks]), + + Parent = self(), + + {Time, _} = timer:tc(fun() -> + Pids = [spawn_link(fun() -> + lists:foreach(fun(I) -> + Ref = py_event_loop:create_task(math, sqrt, [float(I)]), + {ok, _} = py_event_loop:await(Ref) + end, lists:seq(1, TasksPerProc)), + Parent ! {done, self()} + end) || _ <- lists:seq(1, NumProcs)], + + [receive {done, Pid} -> ok end || Pid <- Pids] + end), + + TimeMs = Time / 1000, + TasksPerSec = TotalTasks / (TimeMs / 1000), + + io:format(" Total time: ~.2f ms~n", [TimeMs]), + io:format(" Throughput: ~p tasks/sec~n~n", [round(TasksPerSec)]). + +%% Spawn fire-and-forget +bench_spawn_fire_forget(N) -> + io:format("Benchmark: spawn_task fire-and-forget (math.sqrt)~n"), + io:format(" Iterations: ~p~n", [N]), + + {Time, _} = timer:tc(fun() -> + lists:foreach(fun(I) -> + ok = py_event_loop:spawn_task(math, sqrt, [float(I)]) + end, lists:seq(1, N)) + end), + + print_results(Time, N, "spawns"), + %% Give time for spawned tasks to complete + timer:sleep(100). + +print_results(TimeUs, N, Unit) -> + TimeMs = TimeUs / 1000, + PerOp = TimeMs / N, + OpsPerSec = N / (TimeMs / 1000), + + io:format(" Total time: ~.2f ms~n", [TimeMs]), + io:format(" Per ~s: ~.3f ms~n", [Unit, PerOp]), + io:format(" Throughput: ~p ~s/sec~n~n", [round(OpsPerSec), Unit]). diff --git a/priv/test_async_task.py b/priv/test_async_task.py new file mode 100644 index 0000000..a0c0746 --- /dev/null +++ b/priv/test_async_task.py @@ -0,0 +1,21 @@ +"""Test async task for uvloop-inspired API.""" +import asyncio + +async def simple_task(): + """A simple async task that returns a value.""" + await asyncio.sleep(0.01) + return "hello from async" + +async def task_with_args(x, y): + """An async task that takes arguments.""" + await asyncio.sleep(0.01) + return x + y + +async def failing_task(): + """An async task that raises an exception.""" + await asyncio.sleep(0.01) + raise ValueError("intentional error") + +def sync_func(): + """A regular non-async function.""" + return "sync result" diff --git a/src/py.erl b/src/py.erl index a272c6c..72e7b15 100644 --- a/src/py.erl +++ b/src/py.erl @@ -98,6 +98,8 @@ ensure_venv/2, ensure_venv/3, activate_venv/1, + %% Process-local Python environment + get_local_env/1, deactivate_venv/0, venv_info/0, %% Execution info @@ -153,11 +155,47 @@ %% Default timeout for synchronous calls (30 seconds) -define(DEFAULT_TIMEOUT, 30000). +%% Process dictionary key for local Python environment +-define(LOCAL_ENV_KEY, py_local_env). + +%% @doc Get or create a process-local Python environment for a context. +%% +%% Each Erlang process can have Python environments per interpreter. +%% The environments are stored in the process dictionary keyed by interpreter ID +%% and are automatically freed when the process exits. +%% +%% The environment is created inside the context's interpreter to ensure +%% the correct memory allocator is used. This is critical for subinterpreters +%% where each interpreter has its own memory allocator. +%% +%% @param Ctx Context pid +%% @returns EnvRef - NIF resource reference to the Python environment +-spec get_local_env(pid()) -> reference(). +get_local_env(Ctx) when is_pid(Ctx) -> + {ok, InterpId} = py_context:get_interp_id(Ctx), + Envs = case get(?LOCAL_ENV_KEY) of + undefined -> #{}; + M when is_map(M) -> M; + %% Handle legacy single-ref format (shouldn't happen but be safe) + _OldRef -> #{} + end, + case maps:get(InterpId, Envs, undefined) of + undefined -> + {ok, Ref} = py_context:create_local_env(Ctx), + put(?LOCAL_ENV_KEY, Envs#{InterpId => Ref}), + Ref; + Ref -> + Ref + end. + %%% ============================================================================ %%% Synchronous API %%% ============================================================================ %% @doc Call a Python function synchronously. +%% +%% In worker mode, the call uses the process-local Python environment, +%% allowing access to functions defined via py:exec() in the same process. -spec call(py_module(), py_func(), py_args()) -> py_result(). call(Module, Func, Args) -> call(Module, Func, Args, #{}). @@ -169,6 +207,8 @@ call(Module, Func, Args) -> %% - `call(Pool, Module, Func, Args)' - Call using a named pool (default, io, etc.) %% - `call(Module, Func, Args, Kwargs)' - Call with keyword arguments on default pool %% +%% In worker mode, calls use the process-local Python environment. +%% %% @param CtxOrPoolOrModule Context pid, pool name, or Python module %% @param ModuleOrFunc Python module or function name %% @param FuncOrArgs Function name or arguments list @@ -177,7 +217,8 @@ call(Module, Func, Args) -> ; (py_context_router:pool_name(), py_module(), py_func(), py_args()) -> py_result() ; (py_module(), py_func(), py_args(), py_kwargs()) -> py_result(). call(Ctx, Module, Func, Args) when is_pid(Ctx) -> - py_context:call(Ctx, Module, Func, Args, #{}); + EnvRef = get_local_env(Ctx), + py_context:call(Ctx, Module, Func, Args, #{}, infinity, EnvRef); call(Pool, Module, Func, Args) when is_atom(Pool), is_atom(Func) -> %% Pool-based call (e.g., py:call(io, math, sqrt, [16])) call(Pool, Module, Func, Args, #{}); @@ -199,7 +240,8 @@ call(Module, Func, Args, Kwargs) -> call(Ctx, Module, Func, Args, Opts) when is_pid(Ctx), is_map(Opts) -> Kwargs = maps:get(kwargs, Opts, #{}), Timeout = maps:get(timeout, Opts, infinity), - py_context:call(Ctx, Module, Func, Args, Kwargs, Timeout); + EnvRef = get_local_env(Ctx), + py_context:call(Ctx, Module, Func, Args, Kwargs, Timeout, EnvRef); call(Pool, Module, Func, Args, Kwargs) when is_atom(Pool), is_atom(Func), is_map(Kwargs) -> %% Pool-based call with kwargs (e.g., py:call(io, math, pow, [2, 3], #{round => true})) do_pool_call(Pool, Module, Func, Args, Kwargs, ?DEFAULT_TIMEOUT); @@ -210,12 +252,14 @@ call(Module, Func, Args, Kwargs, Timeout) -> %% @private %% Call using a named pool with semaphore protection +%% Uses the process-local environment from the calling process do_pool_call(Pool, Module, Func, Args, Kwargs, Timeout) -> case py_semaphore:acquire(Timeout) of ok -> try Ctx = py_context_router:get_context(Pool), - py_context:call(Ctx, Module, Func, Args, Kwargs, Timeout) + EnvRef = get_local_env(Ctx), + py_context:call(Ctx, Module, Func, Args, Kwargs, Timeout, EnvRef) after py_semaphore:release() end; @@ -224,6 +268,9 @@ do_pool_call(Pool, Module, Func, Args, Kwargs, Timeout) -> end. %% @doc Evaluate a Python expression and return the result. +%% +%% In worker mode, evaluation uses the process-local Python environment. +%% Variables defined via exec are visible in eval within the same process. -spec eval(string() | binary()) -> py_result(). eval(Code) -> eval(Code, #{}). @@ -231,44 +278,54 @@ eval(Code) -> %% @doc Evaluate a Python expression with local variables. %% %% When the first argument is a pid (context), evaluates using the new -%% process-per-context architecture. +%% process-per-context architecture with process-local environment. -spec eval(pid(), string() | binary()) -> py_result() ; (string() | binary(), map()) -> py_result(). eval(Ctx, Code) when is_pid(Ctx) -> - py_context:eval(Ctx, Code, #{}); + EnvRef = get_local_env(Ctx), + py_context:eval(Ctx, Code, #{}, infinity, EnvRef); eval(Code, Locals) -> eval(Code, Locals, ?DEFAULT_TIMEOUT). %% @doc Evaluate a Python expression with local variables and timeout. %% %% When the first argument is a pid (context), evaluates using the new -%% process-per-context architecture with locals. +%% process-per-context architecture with process-local environment. %% %% Timeout is in milliseconds. Use `infinity' for no timeout. -spec eval(pid(), string() | binary(), map()) -> py_result() ; (string() | binary(), map(), timeout()) -> py_result(). eval(Ctx, Code, Locals) when is_pid(Ctx), is_map(Locals) -> - py_context:eval(Ctx, Code, Locals); + EnvRef = get_local_env(Ctx), + py_context:eval(Ctx, Code, Locals, infinity, EnvRef); eval(Code, Locals, Timeout) -> %% Always route through context process - it handles callbacks inline using %% suspension-based approach (no separate callback handler, no blocking) Ctx = py_context_router:get_context(), - py_context:eval(Ctx, Code, Locals, Timeout). + EnvRef = get_local_env(Ctx), + py_context:eval(Ctx, Code, Locals, Timeout, EnvRef). %% @doc Execute Python statements (no return value expected). +%% +%% In worker mode, the code runs in a process-local Python environment. +%% Variables defined via exec persist within the calling Erlang process. +%% In subinterpreter mode, each context has its own isolated namespace. -spec exec(string() | binary()) -> ok | {error, term()}. exec(Code) -> %% Always route through context process - it handles callbacks inline using %% suspension-based approach (no separate callback handler, no blocking) Ctx = py_context_router:get_context(), - py_context:exec(Ctx, Code). + EnvRef = get_local_env(Ctx), + py_context:exec(Ctx, Code, EnvRef). %% @doc Execute Python statements using a specific context. %% %% This is the explicit context variant of exec/1. +%% Uses the process-local environment for the calling process. -spec exec(pid(), string() | binary()) -> ok | {error, term()}. exec(Ctx, Code) when is_pid(Ctx) -> - py_context:exec(Ctx, Code). + EnvRef = get_local_env(Ctx), + py_context:exec(Ctx, Code, EnvRef). %%% ============================================================================ %%% Asynchronous API diff --git a/src/py_context.erl b/src/py_context.erl index a769dce..944d5c4 100644 --- a/src/py_context.erl +++ b/src/py_context.erl @@ -38,12 +38,17 @@ stop/1, call/5, call/6, + call/7, eval/3, eval/4, + eval/5, exec/2, + exec/3, call_method/4, to_term/1, - get_interp_id/1 + get_interp_id/1, + is_subinterp/1, + create_local_env/1 ]). %% Internal exports @@ -142,6 +147,34 @@ call(Ctx, Module, Func, Args, Kwargs, Timeout) when is_pid(Ctx) -> {error, timeout} end. +%% @doc Call a Python function with a process-local environment. +%% +%% @param Ctx Context process +%% @param Module Python module name +%% @param Func Function name +%% @param Args List of arguments +%% @param Kwargs Map of keyword arguments +%% @param Timeout Timeout in milliseconds +%% @param EnvRef Process-local environment reference +%% @returns {ok, Result} | {error, Reason} +-spec call(context(), atom() | binary(), atom() | binary(), list(), map(), + timeout(), reference()) -> {ok, term()} | {error, term()}. +call(Ctx, Module, Func, Args, Kwargs, Timeout, EnvRef) when is_pid(Ctx), is_reference(EnvRef) -> + MRef = erlang:monitor(process, Ctx), + ModuleBin = to_binary(Module), + FuncBin = to_binary(Func), + Ctx ! {call, self(), MRef, ModuleBin, FuncBin, Args, Kwargs, EnvRef}, + receive + {MRef, Result} -> + erlang:demonitor(MRef, [flush]), + Result; + {'DOWN', MRef, process, Ctx, Reason} -> + {error, {context_died, Reason}} + after Timeout -> + erlang:demonitor(MRef, [flush]), + {error, timeout} + end. + %% @doc Evaluate a Python expression. %% %% @param Ctx Context process @@ -171,6 +204,31 @@ eval(Ctx, Code, Locals, Timeout) when is_pid(Ctx) -> {error, timeout} end. +%% @doc Evaluate a Python expression with a process-local environment. +%% +%% @param Ctx Context process +%% @param Code Python code to evaluate +%% @param Locals Map of local variables +%% @param Timeout Timeout in milliseconds +%% @param EnvRef Process-local environment reference +%% @returns {ok, Result} | {error, Reason} +-spec eval(context(), binary() | string(), map(), timeout(), reference()) -> + {ok, term()} | {error, term()}. +eval(Ctx, Code, Locals, Timeout, EnvRef) when is_pid(Ctx), is_reference(EnvRef) -> + MRef = erlang:monitor(process, Ctx), + CodeBin = to_binary(Code), + Ctx ! {eval, self(), MRef, CodeBin, Locals, EnvRef}, + receive + {MRef, Result} -> + erlang:demonitor(MRef, [flush]), + Result; + {'DOWN', MRef, process, Ctx, Reason} -> + {error, {context_died, Reason}} + after Timeout -> + erlang:demonitor(MRef, [flush]), + {error, timeout} + end. + %% @doc Execute Python statements. %% %% @param Ctx Context process @@ -192,6 +250,28 @@ exec(Ctx, Code) when is_pid(Ctx) -> {error, timeout} end. +%% @doc Execute Python statements with a process-local environment. +%% +%% @param Ctx Context process +%% @param Code Python code to execute +%% @param EnvRef Process-local environment reference +%% @returns ok | {error, Reason} +-spec exec(context(), binary() | string(), reference()) -> ok | {error, term()}. +exec(Ctx, Code, EnvRef) when is_pid(Ctx), is_reference(EnvRef) -> + MRef = erlang:monitor(process, Ctx), + CodeBin = to_binary(Code), + Ctx ! {exec, self(), MRef, CodeBin, EnvRef}, + receive + {MRef, Result} -> + erlang:demonitor(MRef, [flush]), + Result; + {'DOWN', MRef, process, Ctx, Reason} -> + {error, {context_died, Reason}} + after infinity -> + erlang:demonitor(MRef, [flush]), + {error, timeout} + end. + %% @doc Call a method on a Python object reference. -spec call_method(context(), reference(), atom() | binary(), list()) -> {ok, term()} | {error, term()}. @@ -226,6 +306,43 @@ get_interp_id(Ctx) when is_pid(Ctx) -> {error, {context_died, Reason}} end. +%% @doc Check if this context is a subinterpreter. +%% +%% Returns true for subinterpreter mode, false for worker mode. +%% In worker mode, process-local environments are used. +%% In subinterpreter mode, each context has its own isolated namespace. +-spec is_subinterp(context()) -> boolean(). +is_subinterp(Ctx) when is_pid(Ctx) -> + MRef = erlang:monitor(process, Ctx), + Ctx ! {is_subinterp, self(), MRef}, + receive + {MRef, Result} -> + erlang:demonitor(MRef, [flush]), + Result; + {'DOWN', MRef, process, Ctx, _Reason} -> + false + end. + +%% @doc Create a process-local Python environment for this context. +%% +%% The environment is created inside the context's interpreter to ensure +%% the correct memory allocator is used. This is critical for subinterpreters +%% where each interpreter has its own memory allocator. +%% +%% The returned EnvRef should be stored in the calling process's dictionary, +%% keyed by interpreter ID. +-spec create_local_env(context()) -> {ok, reference()} | {error, term()}. +create_local_env(Ctx) when is_pid(Ctx) -> + MRef = erlang:monitor(process, Ctx), + Ctx ! {create_local_env, self(), MRef}, + receive + {MRef, Result} -> + erlang:demonitor(MRef, [flush]), + Result; + {'DOWN', MRef, process, Ctx, Reason} -> + {error, {context_died, Reason}} + end. + %% ============================================================================ %% Internal functions %% ============================================================================ @@ -332,16 +449,34 @@ loop(#state{ref = Ref, interp_id = InterpId} = State) -> From ! {MRef, Result}, loop(State); + %% Call with process-local environment (worker mode) + {call, From, MRef, Module, Func, Args, Kwargs, EnvRef} -> + Result = handle_call_with_suspension_and_env(Ref, Module, Func, Args, Kwargs, EnvRef), + From ! {MRef, Result}, + loop(State); + {eval, From, MRef, Code, Locals} -> Result = handle_eval_with_suspension(Ref, Code, Locals), From ! {MRef, Result}, loop(State); + %% Eval with process-local environment (worker mode) + {eval, From, MRef, Code, Locals, EnvRef} -> + Result = handle_eval_with_suspension_and_env(Ref, Code, Locals, EnvRef), + From ! {MRef, Result}, + loop(State); + {exec, From, MRef, Code} -> Result = py_nif:context_exec(Ref, Code), From ! {MRef, Result}, loop(State); + %% Exec with process-local environment (worker mode) + {exec, From, MRef, Code, EnvRef} -> + Result = py_nif:context_exec(Ref, Code, EnvRef), + From ! {MRef, Result}, + loop(State); + {call_method, From, MRef, ObjRef, Method, Args} -> Result = py_nif:context_call_method(Ref, ObjRef, Method, Args), From ! {MRef, Result}, @@ -351,6 +486,20 @@ loop(#state{ref = Ref, interp_id = InterpId} = State) -> From ! {MRef, {ok, InterpId}}, loop(State); + {is_subinterp, From, MRef} -> + %% Check the interp_id to determine if this is a subinterpreter + %% Subinterpreters have interp_id > 0 (main interpreter is 0) + %% But actually we need to check the mode, not just interp_id + IsSubinterp = is_context_subinterp(Ref), + From ! {MRef, IsSubinterp}, + loop(State); + + {create_local_env, From, MRef} -> + %% Create env inside this context's interpreter + Result = py_nif:create_local_env(Ref), + From ! {MRef, Result}, + loop(State); + {stop, From, MRef} -> terminate(normal, State), From ! {MRef, ok}; @@ -506,6 +655,37 @@ handle_eval_with_suspension(Ref, Code, Locals) -> Result end. +%% @private +%% Handle call with process-local environment +handle_call_with_suspension_and_env(Ref, Module, Func, Args, Kwargs, EnvRef) -> + case py_nif:context_call(Ref, Module, Func, Args, Kwargs, EnvRef) of + {suspended, _CallbackId, StateRef, {FuncName, CallbackArgs}} -> + CallbackResult = handle_callback_with_nested_receive(Ref, FuncName, CallbackArgs), + resume_and_continue(Ref, StateRef, CallbackResult); + {schedule, CallbackName, CallbackArgs} -> + handle_schedule(Ref, CallbackName, CallbackArgs); + Result -> + Result + end. + +%% @private +%% Handle eval with process-local environment +handle_eval_with_suspension_and_env(Ref, Code, Locals, EnvRef) -> + case py_nif:context_eval(Ref, Code, Locals, EnvRef) of + {suspended, _CallbackId, StateRef, {FuncName, CallbackArgs}} -> + CallbackResult = handle_callback_with_nested_receive(Ref, FuncName, CallbackArgs), + resume_and_continue(Ref, StateRef, CallbackResult); + {schedule, CallbackName, CallbackArgs} -> + handle_schedule(Ref, CallbackName, CallbackArgs); + Result -> + Result + end. + +%% @private +%% Check if a context is a subinterpreter (has interp_id > 0) +is_context_subinterp(Ref) -> + py_nif:context_interp_id(Ref) > 0. + %% @private %% Handle schedule marker - Python returned erlang.schedule() or schedule_py() %% Execute the callback and return its result transparently to the caller. @@ -581,18 +761,36 @@ wait_for_callback(Ref, CallbackPid) -> From ! {MRef, NestedResult}, wait_for_callback(Ref, CallbackPid); - %% Handle nested py:eval while waiting for callback + %% Handle nested py:call while waiting for callback (with EnvRef) + {call, From, MRef, Module, Func, Args, Kwargs, EnvRef} -> + NestedResult = handle_call_with_suspension_and_env(Ref, Module, Func, Args, Kwargs, EnvRef), + From ! {MRef, NestedResult}, + wait_for_callback(Ref, CallbackPid); + + %% Handle nested py:eval while waiting for callback (without EnvRef) {eval, From, MRef, Code, Locals} -> NestedResult = handle_eval_with_suspension(Ref, Code, Locals), From ! {MRef, NestedResult}, wait_for_callback(Ref, CallbackPid); + %% Handle nested py:eval while waiting for callback (with EnvRef) + {eval, From, MRef, Code, Locals, EnvRef} -> + NestedResult = handle_eval_with_suspension_and_env(Ref, Code, Locals, EnvRef), + From ! {MRef, NestedResult}, + wait_for_callback(Ref, CallbackPid); + %% Handle nested py:exec while waiting for callback {exec, From, MRef, Code} -> NestedResult = py_nif:context_exec(Ref, Code), From ! {MRef, NestedResult}, wait_for_callback(Ref, CallbackPid); + %% Handle nested py:exec while waiting for callback (with EnvRef) + {exec, From, MRef, Code, EnvRef} -> + NestedResult = py_nif:context_exec(Ref, Code, EnvRef), + From ! {MRef, NestedResult}, + wait_for_callback(Ref, CallbackPid); + %% Handle nested call_method while waiting for callback {call_method, From, MRef, ObjRef, Method, Args} -> NestedResult = py_nif:context_call_method(Ref, ObjRef, Method, Args), @@ -601,9 +799,14 @@ wait_for_callback(Ref, CallbackPid) -> %% Handle get_interp_id while waiting {get_interp_id, From, MRef} -> - %% We can't get InterpId here, but we can query the NIF - InterpIdResult = py_nif:context_interp_id(Ref), - From ! {MRef, InterpIdResult}, + InterpId = py_nif:context_interp_id(Ref), + From ! {MRef, {ok, InterpId}}, + wait_for_callback(Ref, CallbackPid); + + %% Handle create_local_env while waiting + {create_local_env, From, MRef} -> + Result = py_nif:create_local_env(Ref), + From ! {MRef, Result}, wait_for_callback(Ref, CallbackPid) end. diff --git a/src/py_nif.erl b/src/py_nif.erl index b99bfca..7b17f9b 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -166,9 +166,13 @@ context_create/1, context_destroy/1, context_call/5, + context_call/6, context_eval/3, + context_eval/4, context_exec/2, + context_exec/3, context_call_method/4, + create_local_env/1, context_to_term/1, context_interp_id/1, context_set_callback_handler/2, @@ -1271,6 +1275,22 @@ context_destroy(_ContextRef) -> context_call(_ContextRef, _Module, _Func, _Args, _Kwargs) -> ?NIF_STUB. +%% @doc Call a Python function in a context with a process-local environment. +%% +%% @param ContextRef Context reference +%% @param Module Python module name +%% @param Func Function name +%% @param Args List of arguments +%% @param Kwargs Map of keyword arguments +%% @param EnvRef Process-local environment reference +%% @returns {ok, Result} | {error, Reason} | {suspended, ...} | {schedule, ...} +-spec context_call(reference(), binary(), binary(), list(), map(), reference()) -> + {ok, term()} | {error, term()} | + {suspended, non_neg_integer(), reference(), {atom(), list()}} | + {schedule, binary(), tuple()}. +context_call(_ContextRef, _Module, _Func, _Args, _Kwargs, _EnvRef) -> + ?NIF_STUB. + %% @doc Evaluate a Python expression in a context. %% %% NO MUTEX - caller must ensure exclusive access (process ownership). @@ -1286,6 +1306,20 @@ context_call(_ContextRef, _Module, _Func, _Args, _Kwargs) -> context_eval(_ContextRef, _Code, _Locals) -> ?NIF_STUB. +%% @doc Evaluate a Python expression in a context with a process-local environment. +%% +%% @param ContextRef Context reference +%% @param Code Python code to evaluate +%% @param Locals Map of local variables +%% @param EnvRef Process-local environment reference +%% @returns {ok, Result} | {error, Reason} | {suspended, ...} | {schedule, ...} +-spec context_eval(reference(), binary(), map(), reference()) -> + {ok, term()} | {error, term()} | + {suspended, non_neg_integer(), reference(), {atom(), list()}} | + {schedule, binary(), tuple()}. +context_eval(_ContextRef, _Code, _Locals, _EnvRef) -> + ?NIF_STUB. + %% @doc Execute Python statements in a context. %% %% NO MUTEX - caller must ensure exclusive access (process ownership). @@ -1297,6 +1331,16 @@ context_eval(_ContextRef, _Code, _Locals) -> context_exec(_ContextRef, _Code) -> ?NIF_STUB. +%% @doc Execute Python code in a context with a process-local environment. +%% +%% @param ContextRef Context reference +%% @param Code Python code to execute +%% @param EnvRef Process-local environment reference +%% @returns ok | {error, Reason} +-spec context_exec(reference(), binary(), reference()) -> ok | {error, term()}. +context_exec(_ContextRef, _Code, _EnvRef) -> + ?NIF_STUB. + %% @doc Call a method on a Python object in a context. %% %% NO MUTEX - caller must ensure exclusive access (process ownership). @@ -1315,6 +1359,22 @@ context_exec(_ContextRef, _Code) -> context_call_method(_ContextRef, _ObjRef, _Method, _Args) -> ?NIF_STUB. +%% @doc Create a new process-local Python environment. +%% +%% Creates a new Python globals/locals dict pair for use as a process-local +%% environment. The dicts are created inside the context's interpreter to +%% ensure the correct memory allocator is used. +%% +%% The returned resource should be stored in the process dictionary, keyed +%% by the interpreter ID. +%% When the process exits, the resource destructor frees the Python dicts. +%% +%% @param CtxRef Context reference (from context_create/1) +%% @returns {ok, EnvRef} | {error, Reason} +-spec create_local_env(reference()) -> {ok, reference()} | {error, term()}. +create_local_env(_CtxRef) -> + ?NIF_STUB. + %% @doc Convert a Python object reference to an Erlang term. %% %% The reference carries the interpreter ID, allowing automatic routing diff --git a/test/py_SUITE.erl b/test/py_SUITE.erl index 5732276..716828d 100644 --- a/test/py_SUITE.erl +++ b/test/py_SUITE.erl @@ -57,7 +57,12 @@ test_asgi_scope_caching/1, test_asgi_scope_method_caching/1, test_asgi_zero_copy_buffer/1, - test_asgi_lazy_headers/1 + test_asgi_lazy_headers/1, + %% Process-bound Python environment tests + test_process_env_isolation/1, + test_process_env_main_function/1, + test_process_env_state_persistence/1, + test_process_env_cleanup/1 ]). all() -> @@ -109,7 +114,12 @@ all() -> test_asgi_scope_caching, test_asgi_scope_method_caching, test_asgi_zero_copy_buffer, - test_asgi_lazy_headers + test_asgi_lazy_headers, + %% Process-bound Python environment tests + test_process_env_isolation, + test_process_env_main_function, + test_process_env_state_persistence, + test_process_env_cleanup ]. init_per_suite(Config) -> @@ -1249,3 +1259,97 @@ test_asgi_lazy_headers(_Config) -> ct:pal("Lazy headers test passed~n"), ok. + +%% ============================================================================ +%% Process-bound Python Environment Tests +%% ============================================================================ + +%% Test that different processes have isolated Python environments +test_process_env_isolation(_Config) -> + Parent = self(), + + %% Spawn process A and set x = 'A' + spawn(fun() -> + ok = py:exec(<<"x = 'A'">>), + {ok, <<"A">>} = py:eval(<<"x">>), + Parent ! {proc_a, ok} + end), + + %% Spawn process B and set x = 'B' + spawn(fun() -> + ok = py:exec(<<"x = 'B'">>), + {ok, <<"B">>} = py:eval(<<"x">>), + Parent ! {proc_b, ok} + end), + + %% Wait for both processes to complete + receive {proc_a, ok} -> ok after 5000 -> ct:fail("Process A timed out") end, + receive {proc_b, ok} -> ok after 5000 -> ct:fail("Process B timed out") end, + + ct:pal("Process isolation test passed~n"), + ok. + +%% Test that functions defined via exec() are accessible via __main__ module +test_process_env_main_function(_Config) -> + %% Define a function in the process-local environment + ok = py:exec(<<"def greet(name): return f'Hello {name}'">>), + + %% Call it via __main__ module + {ok, <<"Hello World">>} = py:call('__main__', greet, [<<"World">>]), + + %% Define another function + ok = py:exec(<<"def add(a, b): return a + b">>), + {ok, 7} = py:call('__main__', add, [3, 4]), + + ct:pal("__main__ function access test passed~n"), + ok. + +%% Test that state persists across multiple calls within the same process +test_process_env_state_persistence(_Config) -> + %% Initialize counter + ok = py:exec(<<"counter = 0">>), + + %% Increment counter multiple times + ok = py:exec(<<"counter += 1">>), + ok = py:exec(<<"counter += 1">>), + ok = py:exec(<<"counter += 1">>), + + %% Verify counter value + {ok, 3} = py:eval(<<"counter">>), + + %% Use a function to increment + ok = py:exec(<<"def increment(): global counter; counter += 1; return counter">>), + {ok, 4} = py:call('__main__', increment, []), + {ok, 5} = py:call('__main__', increment, []), + + %% Verify final value + {ok, 5} = py:eval(<<"counter">>), + + ct:pal("State persistence test passed~n"), + ok. + +%% Test that process-local environment is cleaned up when process exits +test_process_env_cleanup(_Config) -> + %% Get initial memory stats + {ok, InitialStats} = py:memory_stats(), + ct:pal("Initial memory: ~p~n", [InitialStats]), + + %% Spawn a process that creates a large list + spawn(fun() -> + ok = py:exec(<<"big = list(range(1000000))">>), + timer:sleep(50) % Let it exist briefly + end), + + %% Wait for the process to exit + timer:sleep(100), + + %% Force GC + erlang:garbage_collect(), + py:gc(), + + %% Check that memory was freed (this is a soft check) + {ok, FinalStats} = py:memory_stats(), + ct:pal("Final memory: ~p~n", [FinalStats]), + + ct:pal("Cleanup test passed~n"), + ok.