Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ application:ensure_all_started(erlang_python).
%% Evaluate with local variables
{ok, 25} = py:eval(<<"x * y">>, #{x => 5, y => 5}).

%% Async calls
Ref = py:cast(math, factorial, [100]),
%% Async calls with await
Ref = py:spawn_call(math, factorial, [100]),
{ok, Result} = py:await(Ref).

%% Fire-and-forget (no result)
ok = py:cast(erlang, send, [self(), {done, <<"task1">>}]).

%% Streaming from generators
{ok, [0,1,4,9,16]} = py:stream_eval(<<"(x**2 for x in range(5))">>).
```
Expand Down Expand Up @@ -443,10 +446,13 @@ escript examples/logging_example.erl
{ok, Result} = py:call(Module, Function, Args, KwArgs).
{ok, Result} = py:call(Module, Function, Args, KwArgs, Timeout).

%% Async
Ref = py:cast(Module, Function, Args).
%% Async with result
Ref = py:spawn_call(Module, Function, Args).
{ok, Result} = py:await(Ref).
{ok, Result} = py:await(Ref, Timeout).

%% Fire-and-forget (no result returned)
ok = py:cast(Module, Function, Args).
```

### Expression Evaluation
Expand Down
10 changes: 4 additions & 6 deletions c_src/py_callback.c
Original file line number Diff line number Diff line change
Expand Up @@ -2330,8 +2330,9 @@ static PyObject *erlang_channel_receive_impl(PyObject *self, PyObject *args) {

/* Need to wait - release GIL and poll */
{
long elapsed_ms = 0;
long elapsed_us = 0;
const long poll_interval_us = 100; /* 100 microseconds */
const long timeout_us = timeout_ms >= 0 ? timeout_ms * 1000 : -1;

Py_BEGIN_ALLOW_THREADS

Expand All @@ -2342,17 +2343,14 @@ static PyObject *erlang_channel_receive_impl(PyObject *self, PyObject *args) {
}

/* Check timeout */
if (timeout_ms >= 0 && elapsed_ms >= timeout_ms) {
if (timeout_us >= 0 && elapsed_us >= timeout_us) {
result = 2; /* Timeout */
break;
}

/* Sleep briefly */
usleep(poll_interval_us);
elapsed_ms += poll_interval_us / 1000;
if (poll_interval_us < 1000) {
elapsed_ms = (elapsed_ms == 0 && poll_interval_us > 0) ? 1 : elapsed_ms;
}
elapsed_us += poll_interval_us;
}

Py_END_ALLOW_THREADS
Expand Down
4 changes: 2 additions & 2 deletions docs/ai-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -503,9 +503,9 @@ This demonstrates:
For non-blocking LLM calls:

```erlang
%% Start async LLM call
%% Start async LLM call (returns ref for await)
ask_async(Question) ->
py:cast('__main__', generate, [Question, <<"">>]).
py:spawn_call('__main__', generate, [Question, <<"">>]).

%% Gather multiple responses
ask_many(Questions) ->
Expand Down
7 changes: 5 additions & 2 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,16 @@ All operations support optional timeouts:
For non-blocking operations:

```erlang
%% Start async call
Ref = py:cast(math, factorial, [1000]).
%% Start async call (returns ref for await)
Ref = py:spawn_call(math, factorial, [1000]).

%% Do other work...

%% Wait for result
{ok, HugeNumber} = py:await(Ref).

%% Fire-and-forget (no result)
ok = py:cast(some_module, log_event, [EventData]).
```

## Streaming from Generators
Expand Down
10 changes: 6 additions & 4 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This guide covers breaking changes and migration steps when upgrading from erlan

## Quick Checklist

- [ ] Rename `py:call_async` → `py:cast`
- [ ] Rename `py:call_async` → `py:spawn_call` (with await) or `py:cast` (fire-and-forget)
- [ ] Replace `py:bind`/`py:unbind` with `py_context_router`
- [ ] Replace `py:ctx_*` functions with `py_context:*`
- [ ] Replace `erlang_asyncio` imports with `erlang`
Expand Down Expand Up @@ -148,7 +148,7 @@ N = py_context_router:num_contexts().

## API Changes

### `py:call_async` renamed to `py:cast`
### `py:call_async` renamed to `py:spawn_call`

The function for non-blocking Python calls has been renamed to follow gen_server conventions:

Expand All @@ -160,11 +160,13 @@ Ref = py:call_async(math, factorial, [100]),

**After (v2.0):**
```erlang
Ref = py:cast(math, factorial, [100]),
Ref = py:spawn_call(math, factorial, [100]),
{ok, Result} = py:await(Ref).
```

The semantics are identical - only the name changed.
The semantics are identical - `spawn_call` replaces `async_call`.

Note: `py:cast/3,4` is now fire-and-forget (returns `ok`, no await).

### `erlang_asyncio` module removed

Expand Down
2 changes: 1 addition & 1 deletion docs/pools.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ py:register_pool(io, requests), %% API calls

process_batch(Items) ->
%% Parallel fetch from S3 (io pool)
Futures = [py:cast(boto3, download_file, [Key]) || Key <- Items],
Futures = [py:spawn_call(boto3, download_file, [Key]) || Key <- Items],
Files = [py:await(F) || F <- Futures],

%% Process with ML model (default pool - doesn't block I/O)
Expand Down
6 changes: 3 additions & 3 deletions examples/basic_example.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ main(_) ->

io:format("~n=== Async Calls ===~n~n"),

%% Async call
Ref1 = py:cast(math, factorial, [10]),
Ref2 = py:cast(math, factorial, [20]),
%% Async call with spawn_call/await
Ref1 = py:spawn_call(math, factorial, [10]),
Ref2 = py:spawn_call(math, factorial, [20]),

{ok, Fact10} = py:await(Ref1),
{ok, Fact20} = py:await(Ref2),
Expand Down
14 changes: 7 additions & 7 deletions examples/benchmark_compare.erl
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ bench_sync_eval() ->
%% ============================================================================

bench_cast_single() ->
Name = "Cast py:cast single",
Name = "Cast py:spawn_call single",
N = 1000,

io:format("▶ ~s~n", [Name]),
io:format(" Iterations: ~p~n", [N]),

{Time, _} = timer:tc(fun() ->
lists:foreach(fun(I) ->
Ref = py:cast(math, sqrt, [I]),
Ref = py:spawn_call(math, sqrt, [I]),
{ok, _} = py:await(Ref, 5000)
end, lists:seq(1, N))
end),
Expand All @@ -125,7 +125,7 @@ bench_cast_single() ->
{Name, PerCall, Throughput}.

bench_cast_multiple() ->
Name = "Cast py:cast batch (10 calls)",
Name = "Cast py:spawn_call batch (10 calls)",
N = 100,

io:format("▶ ~s~n", [Name]),
Expand All @@ -134,7 +134,7 @@ bench_cast_multiple() ->
{Time, _} = timer:tc(fun() ->
lists:foreach(fun(Batch) ->
%% Start 10 cast calls
Refs = [py:cast(math, sqrt, [Batch * 10 + I])
Refs = [py:spawn_call(math, sqrt, [Batch * 10 + I])
|| I <- lists:seq(1, 10)],
%% Await all
[{ok, _} = py:await(Ref, 5000) || Ref <- Refs]
Expand All @@ -153,7 +153,7 @@ bench_cast_multiple() ->
{Name, PerBatch, Throughput}.

bench_cast_parallel() ->
Name = "Cast py:cast parallel (10 concurrent)",
Name = "Cast py:spawn_call parallel (10 concurrent)",
N = 100,

io:format("▶ ~s~n", [Name]),
Expand All @@ -162,7 +162,7 @@ bench_cast_parallel() ->
{Time, _} = timer:tc(fun() ->
lists:foreach(fun(Batch) ->
%% Start 10 cast calls in parallel
Refs = [py:cast(math, factorial, [20 + (Batch rem 10)])
Refs = [py:spawn_call(math, factorial, [20 + (Batch rem 10)])
|| _ <- lists:seq(1, 10)],
%% Await all results
[py:await(Ref, 5000) || Ref <- Refs]
Expand Down Expand Up @@ -230,7 +230,7 @@ bench_concurrent_cast() ->
{Time, _} = timer:tc(fun() ->
Pids = [spawn_link(fun() ->
lists:foreach(fun(I) ->
Ref = py:cast(math, factorial, [20 + I]),
Ref = py:spawn_call(math, factorial, [20 + I]),
{ok, _} = py:await(Ref, 5000)
end, lists:seq(1, CallsPerProc)),
Parent ! {done, self()}
Expand Down
53 changes: 47 additions & 6 deletions src/py.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
call/5,
cast/3,
cast/4,
cast/5,
spawn_call/3,
spawn_call/4,
spawn_call/5,
await/1,
await/2,
eval/1,
Expand Down Expand Up @@ -270,16 +274,42 @@ exec(Ctx, Code) when is_pid(Ctx) ->
%%% Asynchronous API
%%% ============================================================================

%% @doc Cast a Python function call, returns immediately with a ref.
%% The call executes in a spawned process. Use await/1,2 to get the result.
-spec cast(py_module(), py_func(), py_args()) -> py_ref().
%% @doc Fire-and-forget Python function call.
-spec cast(py_module(), py_func(), py_args()) -> ok.
cast(Module, Func, Args) ->
cast(Module, Func, Args, #{}).

%% @doc Cast a Python function call with kwargs.
-spec cast(py_module(), py_func(), py_args(), py_kwargs()) -> py_ref().
%% @doc Fire-and-forget Python function call with context or kwargs.
-spec cast(pid(), py_module(), py_func(), py_args()) -> ok;
(py_module(), py_func(), py_args(), py_kwargs()) -> ok.
cast(Ctx, Module, Func, Args) when is_pid(Ctx) ->
cast(Ctx, Module, Func, Args, #{});
cast(Module, Func, Args, Kwargs) ->
%% Spawn a process to execute the call and return a ref
spawn(fun() ->
Ctx = py_context_router:get_context(),
_ = py_context:call(Ctx, Module, Func, Args, Kwargs)
end),
ok.

%% @doc Fire-and-forget Python function call with context and kwargs.
-spec cast(pid(), py_module(), py_func(), py_args(), py_kwargs()) -> ok.
cast(Ctx, Module, Func, Args, Kwargs) when is_pid(Ctx) ->
spawn(fun() ->
_ = py_context:call(Ctx, Module, Func, Args, Kwargs)
end),
ok.

%% @doc Spawn a Python function call, returns immediately with a ref.
-spec spawn_call(py_module(), py_func(), py_args()) -> py_ref().
spawn_call(Module, Func, Args) ->
spawn_call(Module, Func, Args, #{}).

%% @doc Spawn a Python function call with context or kwargs.
-spec spawn_call(pid(), py_module(), py_func(), py_args()) -> py_ref();
(py_module(), py_func(), py_args(), py_kwargs()) -> py_ref().
spawn_call(Ctx, Module, Func, Args) when is_pid(Ctx) ->
spawn_call(Ctx, Module, Func, Args, #{});
spawn_call(Module, Func, Args, Kwargs) ->
Ref = make_ref(),
Parent = self(),
spawn(fun() ->
Expand All @@ -289,6 +319,17 @@ cast(Module, Func, Args, Kwargs) ->
end),
Ref.

%% @doc Spawn a Python function call with context and kwargs.
-spec spawn_call(pid(), py_module(), py_func(), py_args(), py_kwargs()) -> py_ref().
spawn_call(Ctx, Module, Func, Args, Kwargs) when is_pid(Ctx) ->
Ref = make_ref(),
Parent = self(),
spawn(fun() ->
Result = py_context:call(Ctx, Module, Func, Args, Kwargs),
Parent ! {py_response, Ref, Result}
end),
Ref.

%% @doc Wait for an async call to complete.
-spec await(py_ref()) -> py_result().
await(Ref) ->
Expand Down
17 changes: 15 additions & 2 deletions test/py_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
test_eval_complex_locals/1,
test_exec/1,
test_cast/1,
test_spawn_call/1,
test_type_conversions/1,
test_nested_types/1,
test_timeout/1,
Expand Down Expand Up @@ -68,6 +69,7 @@ all() ->
test_eval_complex_locals,
test_exec,
test_cast,
test_spawn_call,
test_type_conversions,
test_nested_types,
test_timeout,
Expand Down Expand Up @@ -204,9 +206,20 @@ def my_func():
ok.

test_cast(_Config) ->
Ref1 = py:cast(math, sqrt, [100]),
Ref2 = py:cast(math, sqrt, [144]),
%% Test fire-and-forget cast with erlang.send
Self = erlang:self(),
ok = py:cast(erlang, send, [Self, {<<"result">>, <<"msg1">>}]),
ok = py:cast(erlang, send, [Self, {<<"result">>, <<"msg2">>}]),
%% Wait for results (order may vary)
R1 = receive {<<"result">>, V1} -> V1 after 5000 -> ct:fail(timeout1) end,
R2 = receive {<<"result">>, V2} -> V2 after 5000 -> ct:fail(timeout2) end,
true = lists:sort([R1, R2]) =:= [<<"msg1">>, <<"msg2">>],
ok.

test_spawn_call(_Config) ->
%% Test spawn_call with await
Ref1 = py:spawn_call(math, sqrt, [100]),
Ref2 = py:spawn_call(math, sqrt, [144]),
{ok, 10.0} = py:await(Ref1),
{ok, 12.0} = py:await(Ref2),
ok.
Expand Down
Loading