From f78610d5f70d1fb85523aac3297b51c8808fdd91 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Fri, 13 Mar 2026 03:37:40 +0100 Subject: [PATCH 1/2] Fix channel receive timeout tracking Track elapsed time in microseconds to avoid integer division truncation (100/1000=0) that broke timeouts >1ms. --- c_src/py_callback.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/c_src/py_callback.c b/c_src/py_callback.c index 763cde8..c8c06d6 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -2330,8 +2330,9 @@ static PyObject *erlang_channel_receive_impl(PyObject *self, PyObject *args) { /* Need to wait - release GIL and poll */ { - long elapsed_ms = 0; + long elapsed_us = 0; const long poll_interval_us = 100; /* 100 microseconds */ + const long timeout_us = timeout_ms >= 0 ? timeout_ms * 1000 : -1; Py_BEGIN_ALLOW_THREADS @@ -2342,17 +2343,14 @@ static PyObject *erlang_channel_receive_impl(PyObject *self, PyObject *args) { } /* Check timeout */ - if (timeout_ms >= 0 && elapsed_ms >= timeout_ms) { + if (timeout_us >= 0 && elapsed_us >= timeout_us) { result = 2; /* Timeout */ break; } /* Sleep briefly */ usleep(poll_interval_us); - elapsed_ms += poll_interval_us / 1000; - if (poll_interval_us < 1000) { - elapsed_ms = (elapsed_ms == 0 && poll_interval_us > 0) ? 1 : elapsed_ms; - } + elapsed_us += poll_interval_us; } Py_END_ALLOW_THREADS From c5146050224a2ad36266fe36f61760f8d19ff39d Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Fri, 13 Mar 2026 09:29:31 +0100 Subject: [PATCH 2/2] Refactor py:cast to fire-and-forget, add py:spawn_call - py:cast now returns ok immediately (no await) - py:spawn_call returns ref for use with py:await - Both support explicit context: cast/4,5 and spawn_call/4,5 - Update docs and examples --- README.md | 14 ++++++--- docs/ai-integration.md | 4 +-- docs/getting-started.md | 7 +++-- docs/migration.md | 10 ++++--- docs/pools.md | 2 +- examples/basic_example.erl | 6 ++-- examples/benchmark_compare.erl | 14 ++++----- src/py.erl | 53 ++++++++++++++++++++++++++++++---- test/py_SUITE.erl | 17 +++++++++-- 9 files changed, 96 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 79f1b5c..299b12a 100644 --- a/README.md +++ b/README.md @@ -66,10 +66,13 @@ application:ensure_all_started(erlang_python). %% Evaluate with local variables {ok, 25} = py:eval(<<"x * y">>, #{x => 5, y => 5}). -%% Async calls -Ref = py:cast(math, factorial, [100]), +%% Async calls with await +Ref = py:spawn_call(math, factorial, [100]), {ok, Result} = py:await(Ref). +%% Fire-and-forget (no result) +ok = py:cast(erlang, send, [self(), {done, <<"task1">>}]). + %% Streaming from generators {ok, [0,1,4,9,16]} = py:stream_eval(<<"(x**2 for x in range(5))">>). ``` @@ -443,10 +446,13 @@ escript examples/logging_example.erl {ok, Result} = py:call(Module, Function, Args, KwArgs). {ok, Result} = py:call(Module, Function, Args, KwArgs, Timeout). -%% Async -Ref = py:cast(Module, Function, Args). +%% Async with result +Ref = py:spawn_call(Module, Function, Args). {ok, Result} = py:await(Ref). {ok, Result} = py:await(Ref, Timeout). + +%% Fire-and-forget (no result returned) +ok = py:cast(Module, Function, Args). ``` ### Expression Evaluation diff --git a/docs/ai-integration.md b/docs/ai-integration.md index 71af921..cc08dcd 100644 --- a/docs/ai-integration.md +++ b/docs/ai-integration.md @@ -503,9 +503,9 @@ This demonstrates: For non-blocking LLM calls: ```erlang -%% Start async LLM call +%% Start async LLM call (returns ref for await) ask_async(Question) -> - py:cast('__main__', generate, [Question, <<"">>]). + py:spawn_call('__main__', generate, [Question, <<"">>]). %% Gather multiple responses ask_many(Questions) -> diff --git a/docs/getting-started.md b/docs/getting-started.md index 12b4598..5d2e7ac 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -94,13 +94,16 @@ All operations support optional timeouts: For non-blocking operations: ```erlang -%% Start async call -Ref = py:cast(math, factorial, [1000]). +%% Start async call (returns ref for await) +Ref = py:spawn_call(math, factorial, [1000]). %% Do other work... %% Wait for result {ok, HugeNumber} = py:await(Ref). + +%% Fire-and-forget (no result) +ok = py:cast(some_module, log_event, [EventData]). ``` ## Streaming from Generators diff --git a/docs/migration.md b/docs/migration.md index a108216..30d174d 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -4,7 +4,7 @@ This guide covers breaking changes and migration steps when upgrading from erlan ## Quick Checklist -- [ ] Rename `py:call_async` → `py:cast` +- [ ] Rename `py:call_async` → `py:spawn_call` (with await) or `py:cast` (fire-and-forget) - [ ] Replace `py:bind`/`py:unbind` with `py_context_router` - [ ] Replace `py:ctx_*` functions with `py_context:*` - [ ] Replace `erlang_asyncio` imports with `erlang` @@ -148,7 +148,7 @@ N = py_context_router:num_contexts(). ## API Changes -### `py:call_async` renamed to `py:cast` +### `py:call_async` renamed to `py:spawn_call` The function for non-blocking Python calls has been renamed to follow gen_server conventions: @@ -160,11 +160,13 @@ Ref = py:call_async(math, factorial, [100]), **After (v2.0):** ```erlang -Ref = py:cast(math, factorial, [100]), +Ref = py:spawn_call(math, factorial, [100]), {ok, Result} = py:await(Ref). ``` -The semantics are identical - only the name changed. +The semantics are identical - `spawn_call` replaces `async_call`. + +Note: `py:cast/3,4` is now fire-and-forget (returns `ok`, no await). ### `erlang_asyncio` module removed diff --git a/docs/pools.md b/docs/pools.md index e406860..df6a1a6 100644 --- a/docs/pools.md +++ b/docs/pools.md @@ -221,7 +221,7 @@ py:register_pool(io, requests), %% API calls process_batch(Items) -> %% Parallel fetch from S3 (io pool) - Futures = [py:cast(boto3, download_file, [Key]) || Key <- Items], + Futures = [py:spawn_call(boto3, download_file, [Key]) || Key <- Items], Files = [py:await(F) || F <- Futures], %% Process with ML model (default pool - doesn't block I/O) diff --git a/examples/basic_example.erl b/examples/basic_example.erl index b3a95da..7f5b339 100644 --- a/examples/basic_example.erl +++ b/examples/basic_example.erl @@ -52,9 +52,9 @@ main(_) -> io:format("~n=== Async Calls ===~n~n"), - %% Async call - Ref1 = py:cast(math, factorial, [10]), - Ref2 = py:cast(math, factorial, [20]), + %% Async call with spawn_call/await + Ref1 = py:spawn_call(math, factorial, [10]), + Ref2 = py:spawn_call(math, factorial, [20]), {ok, Fact10} = py:await(Ref1), {ok, Fact20} = py:await(Ref2), diff --git a/examples/benchmark_compare.erl b/examples/benchmark_compare.erl index 3065454..c7bfc8b 100644 --- a/examples/benchmark_compare.erl +++ b/examples/benchmark_compare.erl @@ -101,7 +101,7 @@ bench_sync_eval() -> %% ============================================================================ bench_cast_single() -> - Name = "Cast py:cast single", + Name = "Cast py:spawn_call single", N = 1000, io:format("▶ ~s~n", [Name]), @@ -109,7 +109,7 @@ bench_cast_single() -> {Time, _} = timer:tc(fun() -> lists:foreach(fun(I) -> - Ref = py:cast(math, sqrt, [I]), + Ref = py:spawn_call(math, sqrt, [I]), {ok, _} = py:await(Ref, 5000) end, lists:seq(1, N)) end), @@ -125,7 +125,7 @@ bench_cast_single() -> {Name, PerCall, Throughput}. bench_cast_multiple() -> - Name = "Cast py:cast batch (10 calls)", + Name = "Cast py:spawn_call batch (10 calls)", N = 100, io:format("▶ ~s~n", [Name]), @@ -134,7 +134,7 @@ bench_cast_multiple() -> {Time, _} = timer:tc(fun() -> lists:foreach(fun(Batch) -> %% Start 10 cast calls - Refs = [py:cast(math, sqrt, [Batch * 10 + I]) + Refs = [py:spawn_call(math, sqrt, [Batch * 10 + I]) || I <- lists:seq(1, 10)], %% Await all [{ok, _} = py:await(Ref, 5000) || Ref <- Refs] @@ -153,7 +153,7 @@ bench_cast_multiple() -> {Name, PerBatch, Throughput}. bench_cast_parallel() -> - Name = "Cast py:cast parallel (10 concurrent)", + Name = "Cast py:spawn_call parallel (10 concurrent)", N = 100, io:format("▶ ~s~n", [Name]), @@ -162,7 +162,7 @@ bench_cast_parallel() -> {Time, _} = timer:tc(fun() -> lists:foreach(fun(Batch) -> %% Start 10 cast calls in parallel - Refs = [py:cast(math, factorial, [20 + (Batch rem 10)]) + Refs = [py:spawn_call(math, factorial, [20 + (Batch rem 10)]) || _ <- lists:seq(1, 10)], %% Await all results [py:await(Ref, 5000) || Ref <- Refs] @@ -230,7 +230,7 @@ bench_concurrent_cast() -> {Time, _} = timer:tc(fun() -> Pids = [spawn_link(fun() -> lists:foreach(fun(I) -> - Ref = py:cast(math, factorial, [20 + I]), + Ref = py:spawn_call(math, factorial, [20 + I]), {ok, _} = py:await(Ref, 5000) end, lists:seq(1, CallsPerProc)), Parent ! {done, self()} diff --git a/src/py.erl b/src/py.erl index fbf0f99..a272c6c 100644 --- a/src/py.erl +++ b/src/py.erl @@ -41,6 +41,10 @@ call/5, cast/3, cast/4, + cast/5, + spawn_call/3, + spawn_call/4, + spawn_call/5, await/1, await/2, eval/1, @@ -270,16 +274,42 @@ exec(Ctx, Code) when is_pid(Ctx) -> %%% Asynchronous API %%% ============================================================================ -%% @doc Cast a Python function call, returns immediately with a ref. -%% The call executes in a spawned process. Use await/1,2 to get the result. --spec cast(py_module(), py_func(), py_args()) -> py_ref(). +%% @doc Fire-and-forget Python function call. +-spec cast(py_module(), py_func(), py_args()) -> ok. cast(Module, Func, Args) -> cast(Module, Func, Args, #{}). -%% @doc Cast a Python function call with kwargs. --spec cast(py_module(), py_func(), py_args(), py_kwargs()) -> py_ref(). +%% @doc Fire-and-forget Python function call with context or kwargs. +-spec cast(pid(), py_module(), py_func(), py_args()) -> ok; + (py_module(), py_func(), py_args(), py_kwargs()) -> ok. +cast(Ctx, Module, Func, Args) when is_pid(Ctx) -> + cast(Ctx, Module, Func, Args, #{}); cast(Module, Func, Args, Kwargs) -> - %% Spawn a process to execute the call and return a ref + spawn(fun() -> + Ctx = py_context_router:get_context(), + _ = py_context:call(Ctx, Module, Func, Args, Kwargs) + end), + ok. + +%% @doc Fire-and-forget Python function call with context and kwargs. +-spec cast(pid(), py_module(), py_func(), py_args(), py_kwargs()) -> ok. +cast(Ctx, Module, Func, Args, Kwargs) when is_pid(Ctx) -> + spawn(fun() -> + _ = py_context:call(Ctx, Module, Func, Args, Kwargs) + end), + ok. + +%% @doc Spawn a Python function call, returns immediately with a ref. +-spec spawn_call(py_module(), py_func(), py_args()) -> py_ref(). +spawn_call(Module, Func, Args) -> + spawn_call(Module, Func, Args, #{}). + +%% @doc Spawn a Python function call with context or kwargs. +-spec spawn_call(pid(), py_module(), py_func(), py_args()) -> py_ref(); + (py_module(), py_func(), py_args(), py_kwargs()) -> py_ref(). +spawn_call(Ctx, Module, Func, Args) when is_pid(Ctx) -> + spawn_call(Ctx, Module, Func, Args, #{}); +spawn_call(Module, Func, Args, Kwargs) -> Ref = make_ref(), Parent = self(), spawn(fun() -> @@ -289,6 +319,17 @@ cast(Module, Func, Args, Kwargs) -> end), Ref. +%% @doc Spawn a Python function call with context and kwargs. +-spec spawn_call(pid(), py_module(), py_func(), py_args(), py_kwargs()) -> py_ref(). +spawn_call(Ctx, Module, Func, Args, Kwargs) when is_pid(Ctx) -> + Ref = make_ref(), + Parent = self(), + spawn(fun() -> + Result = py_context:call(Ctx, Module, Func, Args, Kwargs), + Parent ! {py_response, Ref, Result} + end), + Ref. + %% @doc Wait for an async call to complete. -spec await(py_ref()) -> py_result(). await(Ref) -> diff --git a/test/py_SUITE.erl b/test/py_SUITE.erl index 89a3d92..5732276 100644 --- a/test/py_SUITE.erl +++ b/test/py_SUITE.erl @@ -17,6 +17,7 @@ test_eval_complex_locals/1, test_exec/1, test_cast/1, + test_spawn_call/1, test_type_conversions/1, test_nested_types/1, test_timeout/1, @@ -68,6 +69,7 @@ all() -> test_eval_complex_locals, test_exec, test_cast, + test_spawn_call, test_type_conversions, test_nested_types, test_timeout, @@ -204,9 +206,20 @@ def my_func(): ok. test_cast(_Config) -> - Ref1 = py:cast(math, sqrt, [100]), - Ref2 = py:cast(math, sqrt, [144]), + %% Test fire-and-forget cast with erlang.send + Self = erlang:self(), + ok = py:cast(erlang, send, [Self, {<<"result">>, <<"msg1">>}]), + ok = py:cast(erlang, send, [Self, {<<"result">>, <<"msg2">>}]), + %% Wait for results (order may vary) + R1 = receive {<<"result">>, V1} -> V1 after 5000 -> ct:fail(timeout1) end, + R2 = receive {<<"result">>, V2} -> V2 after 5000 -> ct:fail(timeout2) end, + true = lists:sort([R1, R2]) =:= [<<"msg1">>, <<"msg2">>], + ok. +test_spawn_call(_Config) -> + %% Test spawn_call with await + Ref1 = py:spawn_call(math, sqrt, [100]), + Ref2 = py:spawn_call(math, sqrt, [144]), {ok, 10.0} = py:await(Ref1), {ok, 12.0} = py:await(Ref2), ok.