diff --git a/CHANGELOG.md b/CHANGELOG.md index bb18261eaa..b2fb3eedf5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `erlang:display_string/1` and `erlang:display_string/2` - Added Thumb-2 support to armv6m JIT backend, optimizing code for ARMv7-M and later cores - Added support for `binary:split/2,3` list patterns and `trim` / `trim_all` options +- Added Erlang distribution over serial (uart) ### Changed - ~10% binary size reduction by rewriting module loading logic diff --git a/doc/src/distributed-erlang.md b/doc/src/distributed-erlang.md index 7cfd9553ab..13d50a68b4 100644 --- a/doc/src/distributed-erlang.md +++ b/doc/src/distributed-erlang.md @@ -14,10 +14,15 @@ Distribution is currently available on all platforms with TCP/IP communication, - ESP32 - RP2 (Pico) -Two examples are provided: +Distribution over serial (UART) is also available for point-to-point +connections between any two nodes, including microcontrollers without +networking (e.g. STM32). See [Serial distribution](#serial-distribution). + +Three examples are provided: - disterl in `examples/erlang/disterl.erl`: distribution on Unix systems - epmd\_disterl in `examples/erlang/esp32/epmd_disterl.erl`: distribution on ESP32 devices +- serial\_disterl in `examples/erlang/serial_disterl.erl`: distribution over serial (ESP32 and Unix) ## Starting and stopping distribution @@ -94,6 +99,147 @@ fun (DistCtrlr, Length :: pos_integer(), Timeout :: timeout()) -> {ok, Packet} | AtomVM's distribution is based on `socket_dist` and `socket_dist_controller` modules which can also be used with BEAM by definining `BEAM_INTERFACE` to adjust for the difference. +## Serial distribution + +AtomVM supports distribution over serial (UART) connections using the +`serial_dist` module. This is useful for microcontrollers that lack +WiFi/TCP (e.g. STM32) but have UART, and for testing distribution +locally using virtual serial ports. + +### Quick start + +```erlang +{ok, _} = net_kernel:start('mynode@serial.local', #{ + name_domain => longnames, + proto_dist => serial_dist, + avm_dist_opts => #{ + uart_opts => [{peripheral, "UART1"}, {speed, 115200}, + {tx, 17}, {rx, 16}], + uart_module => uart + } +}). +``` + +On Unix, the `peripheral` is a device path such as `"/dev/ttyUSB0"` and +the `uart_module` is `uart` from the `avm_unix` library. + +### serial\_dist options + +- `uart_opts` — proplist passed to `UartModule:open/1` for a single port + (see `uart_hal` for common parameters: `peripheral`, `speed`, + `data_bits`, `stop_bits`, `parity`, `flow_control`) +- `uart_ports` — list of proplists, one per UART port. Use instead of + `uart_opts` when connecting to multiple peers. +- `uart_module` — module implementing the `uart_hal` behaviour. Defaults + to `uart`. + +### Wire protocol + +All packets on the wire use the same frame format: + +``` +<<16#AA, 16#55, Length:LenBits/big, Payload:Length/binary, CRC32:32/big>> +``` + +where `LenBits` is 16 during the handshake phase and 32 during the data +phase. The CRC32 covers the `Length` and `Payload` bytes (everything +between the sync marker and the CRC itself). + +The receiver scans for the `<<16#AA, 16#55>>` sync marker, reads the +length field, validates it against a maximum frame size (to reject false +sync matches where the marker appears in stale data), then verifies the +CRC32. On CRC failure the connection is torn down. + +**Sync markers** + +Both sides periodically send bare 2-byte sync markers +(`<<16#AA, 16#55>>`) on the UART outside of any frame. These serve two +purposes: + +- **Liveness detection**: a node knows its peer is alive when it + receives sync markers. +- **Stale data recovery**: after a failed handshake attempt, leftover + bytes remain in the UART buffer. The frame scanner skips over any + data (including stale sync markers) that does not form a valid frame + with a correct length and CRC. + +**Handshake phase (16-bit length)** + +During the Erlang distribution handshake, the `Length` field is 16 bits. +The handshake follows the standard Erlang distribution protocol +(send\_name, send\_status, send\_challenge, send\_challenge\_reply, +send\_challenge\_ack). + +**Data phase (32-bit length)** + +After the handshake completes, the `Length` field switches to 32 bits. +Tick (keepalive) messages are sent as a frame with a zero-length payload +(i.e. `Length = 0`). + +### Peer-to-peer connection model + +Unlike TCP distribution which uses a client/server model (one side +listens, the other connects), serial is point-to-point: both nodes +share a single UART link. + +A **link manager** process on each node is the sole owner of UART +reads. On each iteration it: + +1. Checks its mailbox for a `setup` request from `net_kernel` + (non-blocking). If found, enters the **setup** path (initiator) + immediately without proceeding to subsequent steps. +2. Sends a sync marker. +3. Reads from the UART with a short timeout. +4. Passes the buffer to `scan_frame` which searches for a valid framed + handshake packet. +5. If a complete or partial frame is detected, enters the **accept** + path (responder). +6. Otherwise, loops. + +This design ensures only one process reads from the UART at any time, +avoiding the race condition that would occur if separate accept and +setup processes competed for the same byte stream. + +If a handshake fails (the distribution controller process exits), the +link manager flushes stale `setup` messages from its mailbox and +restarts the loop, allowing retries. + +### Testing with socat + +On Unix, `socat` can create virtual serial port pairs for testing: + +```bash +socat -d -d pty,raw,echo=0 pty,raw,echo=0 +``` + +This creates two pseudo-terminal devices (e.g. `/dev/ttys003` and +`/dev/ttys004`) connected back-to-back. Each AtomVM node uses one side: + +```erlang +%% Node A +{ok, _} = net_kernel:start('a@serial.local', #{ + name_domain => longnames, + proto_dist => serial_dist, + avm_dist_opts => #{ + uart_opts => [{peripheral, "/dev/ttys003"}, {speed, 115200}], + uart_module => uart + } +}). + +%% Node B (separate AtomVM process) +{ok, _} = net_kernel:start('b@serial.local', #{ + name_domain => longnames, + proto_dist => serial_dist, + avm_dist_opts => #{ + uart_opts => [{peripheral, "/dev/ttys004"}, {speed, 115200}], + uart_module => uart + } +}). + +%% From Node B, trigger autoconnect: +{some_registered_name, 'a@serial.local'} ! {self(), hello}. +``` + ## Distribution features Distribution implementation is (very) partial. The most basic features are available: diff --git a/examples/erlang/CMakeLists.txt b/examples/erlang/CMakeLists.txt index e008edd574..3ec47c4203 100644 --- a/examples/erlang/CMakeLists.txt +++ b/examples/erlang/CMakeLists.txt @@ -43,6 +43,7 @@ pack_runnable(network_console network_console estdlib eavmlib alisp) pack_runnable(logging_example logging_example estdlib eavmlib) pack_runnable(http_client http_client estdlib eavmlib avm_network) pack_runnable(disterl disterl estdlib) +pack_runnable(serial_disterl serial_disterl eavmlib estdlib DIALYZE_AGAINST avm_esp32 avm_unix) pack_runnable(i2c_scanner i2c_scanner eavmlib estdlib DIALYZE_AGAINST avm_esp32 avm_rp2 avm_stm32) pack_runnable(i2c_lis3dh i2c_lis3dh eavmlib estdlib DIALYZE_AGAINST avm_esp32 avm_rp2 avm_stm32) pack_runnable(spi_flash spi_flash eavmlib estdlib DIALYZE_AGAINST avm_esp32 avm_rp2) diff --git a/examples/erlang/serial_disterl.erl b/examples/erlang/serial_disterl.erl new file mode 100644 index 0000000000..cd01106b16 --- /dev/null +++ b/examples/erlang/serial_disterl.erl @@ -0,0 +1,151 @@ +% +% This file is part of AtomVM. +% +% Copyright 2026 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +%% @doc Example: distributed Erlang over serial (UART). +%% +%% This example starts distribution using one or more UART connections +%% instead of TCP/IP. It works on ESP32 and Unix (using real serial +%% devices or virtual serial ports created with socat). +%% +%%

ESP32 wiring (single peer)

+%% +%% ``` +%% ESP32 TX (GPIO 17) -> Peer RX +%% ESP32 RX (GPIO 16) -> Peer TX +%% ESP32 GND -> Peer GND +%% ''' +%% +%%

ESP32 with two peers

+%% +%% Many ESP32 boards have UART1 and UART2. Set `SERIAL_MULTI=true' +%% to connect to two peers simultaneously: +%% +%% ``` +%% UART1: TX=17, RX=16 -> Peer A +%% UART2: TX=4, RX=5 -> Peer B +%% ''' +%% +%%

Unix with socat

+%% +%% Create a virtual serial port pair: +%% ``` +%% socat -d -d pty,raw,echo=0 pty,raw,echo=0 +%% ''' +%% Then set the SERIAL_DEVICE environment variable to one of the pty +%% paths before running this example. The peer node uses the other pty. +%% +%%

Connecting

+%% +%% The node name is derived from the serial device, e.g. +%% `ttys003@serial.local' or `uart1@serial.local'. Once both nodes are +%% running, trigger autoconnect from either side: +%% ``` +%% {serial_disterl, 'ttys003@serial.local'} ! {hello, node()}. +%% ''' +-module(serial_disterl). + +-export([start/0]). + +start() -> + UartConfigs = uart_configs(), + NodeName = make_node_name(hd(UartConfigs)), + DistOpts = + case UartConfigs of + [Single] -> #{uart_opts => Single}; + Multiple -> #{uart_ports => Multiple} + end, + {ok, _NetKernelPid} = net_kernel:start(NodeName, #{ + name_domain => longnames, + proto_dist => serial_dist, + avm_dist_opts => DistOpts + }), + io:format("Distribution started over serial (~p port(s))~n", [length(UartConfigs)]), + io:format("Node: ~p~n", [node()]), + net_kernel:set_cookie(<<"AtomVM">>), + io:format("Cookie: ~s~n", [net_kernel:get_cookie()]), + register(serial_disterl, self()), + io:format("Registered as 'serial_disterl'. Waiting for messages.~n"), + io:format("From the peer:~n"), + io:format(" {serial_disterl, '~s'} ! {hello, node()}.~n", [node()]), + loop(). + +%% Build a node name from the serial device. +%% e.g. "UART1" -> 'uart1@serial.local' +%% "/dev/ttys003" -> 'ttys003@serial.local' +make_node_name(UartOpts) -> + Peripheral = proplists:get_value(peripheral, UartOpts, "serial"), + BaseName = basename(Peripheral), + list_to_atom(string:to_lower(BaseName) ++ "@serial.local"). + +basename(Path) -> + case lists:last(string:split(Path, "/", all)) of + [] -> Path; + Name -> Name + end. + +%% Platform-specific UART configuration. +%% Returns a list of UART option proplists (one per port). +uart_configs() -> + case erlang:system_info(machine) of + "ATOM" -> + case atomvm:platform() of + esp32 -> + case os:getenv("SERIAL_MULTI") of + "true" -> + %% Two UARTs: connect to two peers + [ + [{peripheral, "UART1"}, {speed, 115200}, {tx, 17}, {rx, 16}], + [{peripheral, "UART2"}, {speed, 115200}, {tx, 4}, {rx, 5}] + ]; + _ -> + [[{peripheral, "UART1"}, {speed, 115200}, {tx, 17}, {rx, 16}]] + end; + generic_unix -> + Device = os:getenv("SERIAL_DEVICE"), + case Device of + false -> + io:format("Error: set SERIAL_DEVICE env var to a serial port path~n"), + io:format(" e.g. /dev/ttyUSB0 or a socat pty~n"), + exit(no_serial_device); + _ -> + [[{peripheral, Device}, {speed, 115200}]] + end; + Other -> + io:format("Error: unsupported platform ~p~n", [Other]), + exit({unsupported_platform, Other}) + end; + "BEAM" -> + io:format("Error: this example requires AtomVM~n"), + io:format(" See serial_dist module doc for BEAM usage~n"), + exit(beam_not_supported) + end. + +loop() -> + receive + quit -> + io:format("Received quit, stopping.~n"), + ok; + {hello, From} -> + io:format("Hello from ~p!~n", [From]), + loop(); + Other -> + io:format("Received: ~p~n", [Other]), + loop() + end. diff --git a/libs/estdlib/src/CMakeLists.txt b/libs/estdlib/src/CMakeLists.txt index 1ed38c99f5..8b66607180 100644 --- a/libs/estdlib/src/CMakeLists.txt +++ b/libs/estdlib/src/CMakeLists.txt @@ -60,6 +60,7 @@ set(ERLANG_MODULES maps math net + os proc_lib sys logger @@ -68,6 +69,8 @@ set(ERLANG_MODULES queue sets socket + serial_dist + serial_dist_controller socket_dist socket_dist_controller ssl diff --git a/libs/estdlib/src/net_kernel.erl b/libs/estdlib/src/net_kernel.erl index 8510fb1fa2..0a4033addb 100644 --- a/libs/estdlib/src/net_kernel.erl +++ b/libs/estdlib/src/net_kernel.erl @@ -285,7 +285,10 @@ handle_cast(_Message, State) -> {noreply, State}. %% @hidden -handle_info({accept, AcceptPid, SocketPid, inet, tcp}, #state{proto_dist = ProtoDist} = State) -> +handle_info( + {accept, AcceptPid, SocketPid, _Family, _Protocol}, + #state{proto_dist = ProtoDist} = State +) -> Pid = ProtoDist:accept_connection(AcceptPid, SocketPid, State#state.node, [], ?SETUPTIME), AcceptPid ! {self(), controller, Pid}, {noreply, State}; diff --git a/libs/estdlib/src/net_kernel_sup.erl b/libs/estdlib/src/net_kernel_sup.erl index 8015a8869e..a5bced0412 100644 --- a/libs/estdlib/src/net_kernel_sup.erl +++ b/libs/estdlib/src/net_kernel_sup.erl @@ -72,24 +72,36 @@ stop() -> start_link(Options) -> supervisor:start_link({local, ?MODULE}, ?MODULE, [Options]). -init(Options) -> - ChildrenSpec = [ - #{ - id => erl_epmd, - start => {erl_epmd, start_link, []}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => [erl_epmd] - }, - #{ - id => net_kernel, - start => {net_kernel, start_link, Options}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => [net_kernel] - } - ], +init([Options]) -> + ProtoDist = maps:get(proto_dist, Options, socket_dist), + EpmdChildren = + case ProtoDist of + serial_dist -> + %% Serial distribution doesn't need EPMD + []; + _ -> + [ + #{ + id => erl_epmd, + start => {erl_epmd, start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => [erl_epmd] + } + ] + end, + ChildrenSpec = + EpmdChildren ++ + [ + #{ + id => net_kernel, + start => {net_kernel, start_link, [Options]}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => [net_kernel] + } + ], SupFlags = #{strategy => one_for_all}, {ok, {SupFlags, ChildrenSpec}}. diff --git a/libs/estdlib/src/serial_dist.erl b/libs/estdlib/src/serial_dist.erl new file mode 100644 index 0000000000..dbae45fb4b --- /dev/null +++ b/libs/estdlib/src/serial_dist.erl @@ -0,0 +1,492 @@ +% +% This file is part of AtomVM. +% +% Copyright 2026 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +%%----------------------------------------------------------------------------- +%% @doc Distribution over serial (UART) connections. +%% +%% This module implements the Erlang distribution protocol over UART, +%% enabling distributed Erlang between devices connected via serial lines. +%% This is particularly useful for microcontrollers that lack WiFi/TCP +%% (e.g. STM32) but have UART available. +%% +%%

Multi-port support

+%% +%% Many MCUs have multiple UARTs (e.g. UART0 for console, UART1 and UART2 +%% for peer connections). This module supports opening several serial ports, +%% each connecting to a different peer node. Pass a list of UART +%% configurations via the `uart_ports' option: +%% +%% ``` +%% {ok, _} = net_kernel:start('mynode@serial.local', #{ +%% name_domain => longnames, +%% proto_dist => serial_dist, +%% avm_dist_opts => #{ +%% uart_ports => [ +%% [{peripheral, "UART1"}, {speed, 115200}, {tx, 17}, {rx, 16}], +%% [{peripheral, "UART2"}, {speed, 115200}, {tx, 4}, {rx, 5}] +%% ], +%% uart_module => uart +%% } +%% }). +%% ''' +%% +%% For a single port, the `uart_opts' key can also be used: +%% +%% ``` +%% avm_dist_opts => #{ +%% uart_opts => [{peripheral, "UART1"}, {speed, 115200}] +%% } +%% ''' +%% +%%

Peer-to-peer model

+%% +%% Each UART handles exactly one connection. A coordinator process +%% manages one link manager per UART port. Each link manager owns all +%% reads from its UART and arbitrates between incoming connections +%% (accept) and outgoing connections (setup). +%% +%% The link managers periodically send sync markers (`<<16#AA, 16#55>>') +%% on their UART so the peer can detect we are alive. When a peer wants +%% to connect, it sends a preamble of repeated sync markers followed by a +%% framed handshake packet. Each frame on the wire uses the format: +%% +%% ``` +%% <<16#AA, 16#55, Length, Payload, CRC32:32>> +%% ''' +%% +%% The CRC32 covers the Length and Payload bytes. False sync matches +%% (where `<<16#AA, 16#55>>' appears in payload data) are rejected by +%% maximum frame size checks and CRC verification. On CRC failure +%% during an active connection, the connection is torn down. During +%% pre-connection scanning, the buffer is discarded and retried. +%% +%%

BEAM compatibility

+%% +%% This module also works on BEAM (standard Erlang/OTP). On BEAM, start +%% the VM with `-proto_dist serial' and configure UART options via +%% application environment before calling `net_kernel:start/1': +%% +%% ``` +%% application:set_env(serial_dist, dist_opts, #{ +%% uart_opts => [{peripheral, "/dev/ttyUSB0"}, {speed, 115200}], +%% uart_module => my_uart_module +%% }). +%% net_kernel:start(['mynode@serial.local', longnames]). +%% ''' +%% @end +%%----------------------------------------------------------------------------- +-module(serial_dist). + +% dist interface +-export([ + listen/1, + listen/2, + accept/1, + accept_connection/5, + setup/5, + close/1, + select/1, + address/0 +]). + +-include_lib("kernel/include/net_address.hrl"). +-include_lib("kernel/include/dist.hrl"). +-include_lib("kernel/include/dist_util.hrl"). + +-define(SYNC_MAGIC, <<16#AA, 16#55>>). + +-type uart_handle() :: pid() | port(). +-type listen_handle() :: nonempty_list({uart_handle(), module()}). + +-spec listen(string()) -> {ok, {listen_handle(), #net_address{}, pos_integer()}} | {error, any()}. +listen(Name) -> + listen(Name, #{}). + +-spec listen(string(), map() | string()) -> + {ok, {listen_handle(), #net_address{}, pos_integer()}} | {error, any()}. +listen(Name, Opts) when is_map(Opts) -> + %% BEAM OTP 25+ may also pass an empty map. + %% Merge with application env to pick up BEAM-side config. + EnvOpts = beam_env_opts(), + EffectiveOpts = maps:merge(EnvOpts, Opts), + listen_impl(Name, EffectiveOpts); +listen(Name, _Host) -> + %% BEAM path (OTP < 25): Host is a hostname string. + listen_impl(Name, beam_env_opts()). + +beam_env_opts() -> + case application:get_env(serial_dist, dist_opts) of + {ok, O} when is_map(O) -> O; + _ -> #{} + end. + +listen_impl(_Name, Opts) -> + UartMod = maps:get(uart_module, Opts, uart), + UartConfigs = + case maps:find(uart_ports, Opts) of + {ok, Ports} -> + Ports; + error -> + case maps:find(uart_opts, Opts) of + {ok, UartOpts} -> + [UartOpts]; + error -> + %% Fallback: try SERIAL_DEVICE env var + case os:getenv("SERIAL_DEVICE") of + false -> + [[{peripheral, "UART1"}, {speed, 115200}]]; + Device -> + [[{peripheral, Device}, {speed, 115200}]] + end + end + end, + OpenPorts = lists:filtermap( + fun(UartOpts) -> + case UartMod:open(UartOpts) of + {error, Reason} -> + io:format("serial_dist: failed to open UART ~p: ~p~n", [UartOpts, Reason]), + false; + UartPort -> + {true, {UartPort, UartMod}} + end + end, + UartConfigs + ), + case OpenPorts of + [] -> + {error, no_uart_ports}; + _ -> + Address = #net_address{ + address = serial, + host = serial, + protocol = serial, + family = serial + }, + {ok, {OpenPorts, Address, 1}} + end. + +-spec address() -> #net_address{}. +address() -> + #net_address{ + address = serial, + host = serial, + protocol = serial, + family = serial + }. + +%% @doc Called by `net_kernel' after `listen/1' succeeds. +%% Spawns a coordinator process and one link manager per UART port. +-spec accept(listen_handle() | {uart_handle(), module()}) -> pid(). +accept(Ports) when is_list(Ports) -> + Kernel = self(), + spawn_link(fun() -> + register(serial_dist_link_manager, self()), + %% Trap exits so a single link manager crash doesn't + %% tear down the coordinator and all other managers. + process_flag(trap_exit, true), + Coordinator = self(), + Managers = [ + spawn_link(fun() -> + link_manager(Coordinator, UartPort, UartMod, <<>>) + end) + || {UartPort, UartMod} <- Ports + ], + InitialStates = maps:from_list([{M, idle} || M <- Managers]), + coordinator_loop(Kernel, InitialStates, [], #{}) + end); +accept({UartPort, UartMod}) -> + accept([{UartPort, UartMod}]). + +%%-------------------------------------------------------------------- +%% Coordinator +%% +%% The coordinator routes messages between link managers and +%% the net_kernel. It tracks which managers are idle vs busy +%% (handling a connection). It: +%% - Forwards accept notifications from link managers to kernel +%% - Forwards controller assignments from kernel back to managers +%% - Broadcasts setup requests to all idle managers, awards the +%% first responder, and aborts the rest +%%-------------------------------------------------------------------- + +coordinator_loop(Kernel, ManagerStates, PendingAccepts, SetupAwards) -> + receive + %% A link manager detected an incoming connection + {link_accept, ManagerPid, Ctrl} -> + Kernel ! {accept, self(), Ctrl, serial, serial}, + NewStates = maps:put(ManagerPid, busy, ManagerStates), + coordinator_loop(Kernel, NewStates, PendingAccepts ++ [ManagerPid], SetupAwards); + %% Kernel accepts the connection + {Kernel, controller, SupervisorPid} -> + case PendingAccepts of + [ManagerPid | Rest] -> + ManagerPid ! {coordinator_accept, SupervisorPid}, + coordinator_loop(Kernel, ManagerStates, Rest, SetupAwards); + [] -> + io:format("serial_dist: controller assigned with no pending accepts~n"), + coordinator_loop(Kernel, ManagerStates, PendingAccepts, SetupAwards) + end; + %% Kernel rejects the connection + {Kernel, unsupported_protocol} -> + case PendingAccepts of + [ManagerPid | Rest] -> + ManagerPid ! {coordinator_reject}, + coordinator_loop(Kernel, ManagerStates, Rest, SetupAwards); + [] -> + coordinator_loop(Kernel, ManagerStates, PendingAccepts, SetupAwards) + end; + %% Outgoing connection request: broadcast to all idle managers + {setup, SetupPid} -> + IdleManagers = [M || {M, idle} <- maps:to_list(ManagerStates)], + case IdleManagers of + [] -> + SetupPid ! {link_manager_unavailable}, + coordinator_loop(Kernel, ManagerStates, PendingAccepts, SetupAwards); + _ -> + monitor(process, SetupPid), + lists:foreach(fun(M) -> M ! {setup, SetupPid} end, IdleManagers), + NewAwards = maps:put(SetupPid, false, SetupAwards), + coordinator_loop(Kernel, ManagerStates, PendingAccepts, NewAwards) + end; + %% A link manager created a controller for an outgoing setup + {setup_ctrl, ManagerPid, Ctrl, SetupPid} -> + case maps:find(SetupPid, SetupAwards) of + {ok, false} -> + %% First responder wins: forward controller to setup process + SetupPid ! {link_manager, Ctrl}, + ManagerPid ! {setup_awarded}, + NewStates = maps:put(ManagerPid, busy, ManagerStates), + NewAwards = maps:remove(SetupPid, SetupAwards), + coordinator_loop(Kernel, NewStates, PendingAccepts, NewAwards); + _ -> + %% Already awarded or unknown: abort this attempt + ManagerPid ! {setup_abort}, + coordinator_loop(Kernel, ManagerStates, PendingAccepts, SetupAwards) + end; + %% Setup process died before being awarded -- clean up so late + %% responders get aborted instead of sending to a dead process. + {'DOWN', _Ref, process, SetupPid, _Reason} -> + NewAwards = maps:remove(SetupPid, SetupAwards), + coordinator_loop(Kernel, ManagerStates, PendingAccepts, NewAwards); + %% A link manager's connection ended, back to idle + {manager_idle, ManagerPid} -> + NewStates = maps:put(ManagerPid, idle, ManagerStates), + coordinator_loop(Kernel, NewStates, PendingAccepts, SetupAwards); + %% net_kernel (or accept process) died -- shut down. + {'EXIT', Kernel, Reason} -> + exit(Reason); + %% A link manager crashed (UART I/O error etc.) -- remove it so + %% the coordinator continues running for remaining managers. + {'EXIT', ManagerPid, _Reason} -> + NewStates = maps:remove(ManagerPid, ManagerStates), + coordinator_loop(Kernel, NewStates, PendingAccepts, SetupAwards) + end. + +%%-------------------------------------------------------------------- +%% Link manager +%% +%% One link manager per UART port. It: +%% 1. Checks mailbox for {setup, Pid} (non-blocking). +%% If found, enters setup (initiator) path immediately. +%% 2. Sends a sync marker so the peer knows we are alive +%% 3. Reads from UART with a short timeout +%% 4. Tries to parse a framed handshake packet (scan_frame) +%% 5. If a complete or partial frame is detected -> accept path +%% 6. If only sync bytes -> loop +%% +%% On recovery after a failed handshake, stale setup messages are +%% flushed and the loop restarts cleanly. +%%-------------------------------------------------------------------- + +link_manager(Coordinator, UartPort, UartMod, Buffer) -> + %% Check for setup request from coordinator + receive + {setup, SetupPid} -> + do_setup_handshake(Coordinator, UartPort, UartMod, SetupPid) + after 0 -> + ok + end, + %% Send sync so peer knows we are alive + case UartMod:write(UartPort, ?SYNC_MAGIC) of + ok -> + ok; + {error, WriteReason} -> + io:format("serial_dist: UART write error: ~p~n", [WriteReason]), + exit({uart_write_error, WriteReason}) + end, + %% Read from UART + NewBuffer = + case UartMod:read(UartPort, 500) of + {ok, Data} -> + <>; + {error, timeout} -> + Buffer; + {error, Reason} -> + io:format("serial_dist: UART read error: ~p~n", [Reason]), + exit({uart_read_error, Reason}) + end, + %% Try to detect a framed handshake packet + case serial_dist_controller:scan_frame(NewBuffer, 16) of + {ok, _Payload, _Rest} -> + %% Complete frame found; pass raw buffer to accept + do_accept_handshake(Coordinator, UartPort, UartMod, NewBuffer); + {crc_error, _} -> + %% Bad data; discard and loop + link_manager(Coordinator, UartPort, UartMod, <<>>); + {need_more, Trimmed} when byte_size(Trimmed) > 4 -> + %% Partial frame in flight; enter accept, controller will read more + do_accept_handshake(Coordinator, UartPort, UartMod, Trimmed); + {need_more, Trimmed} -> + %% Only sync bytes or too little data; keep buffered + link_manager(Coordinator, UartPort, UartMod, Trimmed) + end. + +%% Responder: handshake data arrived from peer. +do_accept_handshake(Coordinator, UartPort, UartMod, Data) -> + {ok, Ctrl} = serial_dist_controller:start(UartPort, UartMod, Data), + Coordinator ! {link_accept, self(), Ctrl}, + receive + {coordinator_accept, SupervisorPid} -> + true = serial_dist_controller:supervisor(Ctrl, SupervisorPid); + {coordinator_reject} -> + exit(unsupported_protocol) + end, + %% Monitor controller -- if handshake fails, recover. + Ref = monitor(process, Ctrl), + receive + {'DOWN', Ref, process, Ctrl, _Reason} -> + Coordinator ! {manager_idle, self()}, + flush_setup_messages(), + link_manager(Coordinator, UartPort, UartMod, <<>>) + end. + +%% Initiator: send sync preamble, create controller, report to coordinator. +%% The coordinator awards the first manager to respond and aborts the rest +%% (when setup is broadcast to multiple idle managers). +do_setup_handshake(Coordinator, UartPort, UartMod, SetupPid) -> + case serial_dist_controller:send_preamble(UartMod, UartPort) of + ok -> ok; + {error, PreambleReason} -> exit({uart_write_error, preamble, PreambleReason}) + end, + {ok, Ctrl} = serial_dist_controller:start(UartPort, UartMod), + Coordinator ! {setup_ctrl, self(), Ctrl, SetupPid}, + receive + {setup_awarded} -> + Ref = monitor(process, Ctrl), + receive + {'DOWN', Ref, process, Ctrl, _Reason} -> + Coordinator ! {manager_idle, self()}, + flush_setup_messages(), + link_manager(Coordinator, UartPort, UartMod, <<>>) + end; + {setup_abort} -> + exit(Ctrl, shutdown), + flush_setup_messages(), + link_manager(Coordinator, UartPort, UartMod, <<>>) + end. + +flush_setup_messages() -> + receive + {setup, _} -> flush_setup_messages() + after 0 -> + ok + end. + +accept_connection(_AcceptPid, DistCtrl, MyNode, Allowed, SetupTime) -> + Kernel = self(), + spawn_opt( + fun() -> do_accept_connection(Kernel, DistCtrl, MyNode, Allowed, SetupTime) end, + dist_util:net_ticker_spawn_options() + ). + +do_accept_connection(Kernel, DistCtrl, MyNode, Allowed, SetupTime) -> + Timer = dist_util:start_timer(SetupTime), + HSData = hs_data(Kernel, MyNode, DistCtrl, Allowed, undefined, undefined, undefined, Timer), + dist_util:handshake_other_started(HSData). + +hs_data(Kernel, MyNode, DistCtrl, Allowed, OtherNode, OtherVersion, Type, Timer) -> + #hs_data{ + kernel_pid = Kernel, + this_node = MyNode, + socket = DistCtrl, + timer = Timer, + this_flags = 0, + allowed = Allowed, + other_node = OtherNode, + other_version = OtherVersion, + f_send = fun serial_dist_controller:send/2, + f_recv = fun serial_dist_controller:recv/3, + f_setopts_pre_nodeup = fun serial_dist_controller:setopts_pre_nodeup/1, + f_setopts_post_nodeup = fun serial_dist_controller:setopts_post_nodeup/1, + f_getll = fun serial_dist_controller:getll/1, + f_address = fun serial_dist_controller:address/2, + f_handshake_complete = fun serial_dist_controller:handshake_complete/3, + mf_tick = fun serial_dist_controller:tick/1, + mf_getstat = fun serial_dist_controller:getstat/1, + request_type = Type + }. + +%% @doc Called by `net_kernel' to initiate an outgoing connection. +%% Forwards the request to the coordinator which broadcasts to idle +%% link managers. +setup(Node, Type, MyNode, LongOrShortNames, SetupTime) -> + Kernel = self(), + spawn_opt( + fun() -> do_setup(Kernel, Node, Type, MyNode, LongOrShortNames, SetupTime) end, + dist_util:net_ticker_spawn_options() + ). + +do_setup(Kernel, Node, Type, MyNode, _LongOrShortNames, SetupTime) -> + Timer = dist_util:start_timer(SetupTime), + case whereis(serial_dist_link_manager) of + Mgr when is_pid(Mgr) -> + Ref = monitor(process, Mgr), + Mgr ! {setup, self()}, + receive + {link_manager, DistController} -> + demonitor(Ref, [flush]), + dist_util:reset_timer(Timer), + true = serial_dist_controller:supervisor(DistController, self()), + HSData = hs_data( + Kernel, MyNode, DistController, [], Node, 6, Type, Timer + ), + dist_util:handshake_we_started(HSData); + {link_manager_unavailable} -> + demonitor(Ref, [flush]), + ?shutdown2(Node, no_link_managers_available); + {'DOWN', Ref, process, Mgr, _Reason} -> + ?shutdown2(Node, link_manager_died) + after 5000 -> + demonitor(Ref, [flush]), + ?shutdown2(Node, no_link_manager_response) + end; + _ -> + ?shutdown2(Node, no_link_manager) + end. + +close(Ports) when is_list(Ports) -> + lists:foreach(fun({UartPort, UartMod}) -> UartMod:close(UartPort) end, Ports); +close({UartPort, UartMod}) -> + UartMod:close(UartPort); +close(_) -> + ok. + +select(_Node) -> + true. diff --git a/libs/estdlib/src/serial_dist_controller.erl b/libs/estdlib/src/serial_dist_controller.erl new file mode 100644 index 0000000000..90a037dfc4 --- /dev/null +++ b/libs/estdlib/src/serial_dist_controller.erl @@ -0,0 +1,405 @@ +% +% This file is part of AtomVM. +% +% Copyright 2026 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later +% + +%%----------------------------------------------------------------------------- +%% @doc Distribution controller for serial (UART) connections. +%% +%% This module manages the serial link for the Erlang distribution protocol. +%% It is used by {@link serial_dist} and follows the same pattern as +%% {@link socket_dist_controller}. +%% +%% All packets on the wire use the following frame format: +%% +%% ``` +%% <<16#AA, 16#55, Length:LenBits, Payload:Length/binary, CRC32:32>> +%% ''' +%% +%% where `LenBits' is 16 during the handshake phase and 32 during the +%% data phase. CRC32 covers the `Length' and `Payload' bytes. +%% +%% The receiver scans for the `<<16#AA, 16#55>>' sync marker, reads the +%% length, validates against a maximum frame size (to reject false sync +%% matches), then verifies the CRC32. On CRC failure, the connection is +%% torn down (no retry/ACK mechanism). +%% +%% During the handshake phase, `send/2' and `recv/3' are called +%% synchronously. +%% +%% After `handshake_complete/3', the controller switches to asynchronous +%% mode: a dedicated reader process continuously reads from the UART and +%% forwards data to the controller, which reassembles framed distribution +%% packets and feeds them to `erlang:dist_ctrl_put_data/2'. +%% @end +%%----------------------------------------------------------------------------- +-module(serial_dist_controller). + +-include_lib("kernel/include/net_address.hrl"). + +% BEAM's dist_util expects packets to be list of integers. +-ifdef(BEAM_INTERFACE). +-define(POST_PROCESS(Packet), binary_to_list(Packet)). +-else. +-define(POST_PROCESS(Packet), Packet). +-endif. + +-define(SYNC_MAGIC, <<16#AA, 16#55>>). +-define(MAX_HANDSHAKE_FRAME_SIZE, 8192). +-define(MAX_DATA_FRAME_SIZE, 16#100000). +-define(PREAMBLE_COUNT, 16). + +% interface with serial_dist +-export([ + scan_frame/2, + send_preamble/2, + start/2, + start/3, + supervisor/2, + recv/3, + send/2, + setopts_pre_nodeup/1, + setopts_post_nodeup/1, + getll/1, + address/2, + tick/1, + getstat/1, + handshake_complete/3 +]). + +% gen_server API +-behaviour(gen_server). + +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-record(state, { + uart :: pid() | port(), + uart_module :: module(), + dhandle :: reference() | undefined, + buffer :: binary(), + received :: non_neg_integer(), + sent :: non_neg_integer(), + reader :: pid() | undefined +}). + +%%-------------------------------------------------------------------- +%% Public API +%%-------------------------------------------------------------------- + +%% @doc Start a controller for an outgoing (setup) connection. +start(Uart, UartMod) -> + gen_server:start(?MODULE, {Uart, UartMod, <<>>}, []). + +%% @doc Start a controller for an incoming (accept) connection with +%% initial data already read from the UART by the accept loop. +start(Uart, UartMod, InitialData) when is_binary(InitialData) -> + gen_server:start(?MODULE, {Uart, UartMod, InitialData}, []). + +supervisor(Controller, Pid) -> + gen_server:call(Controller, {supervisor, Pid}). + +%% @doc Synchronous receive during handshake. Reads one complete +%% framed packet (with 16-bit length field) from UART. +recv(Controller, Length, Timeout) -> + gen_server:call(Controller, {recv, Length, Timeout}, infinity). + +%% @doc Synchronous send during handshake. Wraps data in a framed +%% packet with 16-bit length field. +send(Controller, Data) -> + gen_server:call(Controller, {send, Data}). + +setopts_pre_nodeup(_Controller) -> + ok. + +setopts_post_nodeup(_Controller) -> + ok. + +getll(Controller) -> + {ok, Controller}. + +address(_Controller, Node) -> + case string:split(atom_to_list(Node), "@") of + [_Name, Host] -> + #net_address{address = serial, host = Host, protocol = serial, family = serial}; + _ -> + {error, no_node} + end. + +tick(Controller) -> + gen_server:cast(Controller, tick). + +getstat(Controller) -> + gen_server:call(Controller, getstat). + +handshake_complete(Controller, _Node, DHandle) -> + gen_server:cast(Controller, {handshake_complete, DHandle}), + ok. + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init({Uart, UartMod, InitialBuffer}) -> + {ok, #state{ + uart = Uart, + uart_module = UartMod, + buffer = InitialBuffer, + received = 0, + sent = 0 + }}. + +handle_call({supervisor, Pid}, _From, State) -> + Result = link(Pid), + {reply, Result, State}; +handle_call( + {recv, _Length, Timeout}, + _From, + #state{uart = Uart, uart_module = UartMod, buffer = Buffer} = State +) -> + case recv_handshake_packet(Uart, UartMod, Buffer, Timeout) of + {ok, Packet, NewBuffer} -> + {reply, {ok, ?POST_PROCESS(Packet)}, State#state{buffer = NewBuffer}}; + {error, _} = Error -> + {reply, Error, State} + end; +handle_call( + {send, Data}, + _From, + #state{uart = Uart, uart_module = UartMod, sent = Sent} = State +) -> + DataBin = iolist_to_binary(Data), + DataSize = byte_size(DataBin), + Result = send_framed(UartMod, Uart, <>), + {reply, Result, State#state{sent = Sent + 1}}; +handle_call(getstat, _From, #state{received = Received, sent = Sent} = State) -> + {reply, {ok, Received, Sent, 0}, State}. + +handle_cast(tick, #state{uart = Uart, uart_module = UartMod, sent = Sent} = State) -> + case send_framed(UartMod, Uart, <<0:32>>) of + ok -> + {noreply, State#state{sent = Sent + 1}}; + {error, Reason} -> + {stop, {serial_write_error, Reason}, State} + end; +handle_cast( + {handshake_complete, DHandle}, + #state{uart = Uart, uart_module = UartMod, buffer = Buffer, received = Received} = State0 +) -> + ok = erlang:dist_ctrl_get_data_notification(DHandle), + % Process any data left over in the buffer from the handshake phase. + % After the handshake, packets use a 4-byte length prefix. + {NewBuffer, NewReceived} = process_recv_buffer(DHandle, Buffer, Received), + % Spawn a dedicated reader that blocks on uart:read/1 and + % forwards chunks to this gen_server. + Self = self(), + Reader = spawn_link(fun() -> reader_loop(Self, Uart, UartMod) end), + + {noreply, State0#state{ + dhandle = DHandle, + buffer = NewBuffer, + received = NewReceived, + reader = Reader + }}. + +handle_info(dist_data, State0) -> + State1 = send_data_loop(State0), + {noreply, State1}; +handle_info( + {serial_data, Data}, + #state{dhandle = DHandle, buffer = Buffer, received = Received} = State +) -> + NewBuffer = <>, + {NewBuffer2, NewReceived} = process_recv_buffer(DHandle, NewBuffer, Received), + + {noreply, State#state{buffer = NewBuffer2, received = NewReceived}}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Handshake-phase packet I/O +%%-------------------------------------------------------------------- + +recv_handshake_packet(Uart, UartMod, Buffer, Timeout) -> + case scan_frame(Buffer, 16) of + {ok, Packet, Rest} -> + {ok, Packet, Rest}; + {crc_error, _Rest} -> + {error, crc_error}; + {need_more, Trimmed} -> + case UartMod:read(Uart, Timeout) of + {ok, Data} -> + recv_handshake_packet( + Uart, UartMod, <>, Timeout + ); + {error, _} = Error -> + Error + end + end. + +%%-------------------------------------------------------------------- +%% Async reader process +%%-------------------------------------------------------------------- + +reader_loop(Controller, Uart, UartMod) -> + case UartMod:read(Uart) of + {ok, Data} -> + Controller ! {serial_data, Data}, + reader_loop(Controller, Uart, UartMod); + {error, Reason} -> + exit({serial_read_error, Reason}) + end. + +%%-------------------------------------------------------------------- +%% Data-phase packet I/O (4-byte length prefix) +%%-------------------------------------------------------------------- + +process_recv_buffer(DHandle, Buffer, Received) -> + case scan_frame(Buffer, 32) of + {ok, Packet, Rest} -> + case Packet of + <<>> -> ok; + _ -> ok = erlang:dist_ctrl_put_data(DHandle, Packet) + end, + process_recv_buffer(DHandle, Rest, Received + 1); + {crc_error, _Rest} -> + exit({serial_dist, crc_error}); + {need_more, Trimmed} -> + {Trimmed, Received} + end. + +send_data_loop(#state{dhandle = DHandle, uart = Uart, uart_module = UartMod, sent = Sent} = State) -> + case erlang:dist_ctrl_get_data(DHandle) of + none -> + ok = erlang:dist_ctrl_get_data_notification(DHandle), + State; + Data -> + DataBin = iolist_to_binary(Data), + DataSize = byte_size(DataBin), + case send_framed(UartMod, Uart, <>) of + ok -> + send_data_loop(State#state{sent = Sent + 1}); + {error, Reason} -> + exit({serial_write_error, Reason}) + end + end. + +%%-------------------------------------------------------------------- +%% Frame encoding / decoding +%%-------------------------------------------------------------------- + +send_framed(UartMod, Uart, LenAndPayload) -> + CRC = erlang:crc32(LenAndPayload), + UartMod:write(Uart, <<16#AA, 16#55, LenAndPayload/binary, CRC:32>>). + +%% @doc Scan a buffer for a framed packet. +%% +%% `LenBits' is 16 (handshake) or 32 (data phase). +%% Returns: +%% `{ok, Payload, Rest}' - a complete, CRC-verified frame was found +%% `{need_more, TrimmedBuffer}' - no complete frame yet +%% `{crc_error, Rest}' - a frame was found but CRC did not match +-spec scan_frame(binary(), 16 | 32) -> + {ok, binary(), binary()} | {need_more, binary()} | {crc_error, binary()}. +scan_frame(Buffer, LenBits) -> + scan_frame(Buffer, LenBits, 0). + +scan_frame(Buffer, LenBits, StartPos) -> + MaxSize = + case LenBits of + 16 -> ?MAX_HANDSHAKE_FRAME_SIZE; + 32 -> ?MAX_DATA_FRAME_SIZE + end, + LenBytes = LenBits div 8, + BufSize = byte_size(Buffer), + SearchLen = BufSize - StartPos, + case SearchLen < 2 of + true -> + %% Not enough bytes to find sync; keep trailing 0xAA if present + {need_more, keep_trailing_sync(Buffer, StartPos)}; + false -> + case binary:match(Buffer, ?SYNC_MAGIC, [{scope, {StartPos, SearchLen}}]) of + nomatch -> + {need_more, keep_trailing_sync(Buffer, StartPos)}; + {Pos, 2} -> + AfterSync = Pos + 2, + Remaining = BufSize - AfterSync, + case Remaining < LenBytes of + true -> + %% Have sync but not enough for length field + <<_:Pos/binary, Kept/binary>> = Buffer, + {need_more, Kept}; + false -> + <<_:AfterSync/binary, Len:LenBits, _/binary>> = Buffer, + case Len > MaxSize of + true -> + %% Length too large -- false sync, skip past it + scan_frame(Buffer, LenBits, Pos + 1); + false -> + FrameEnd = AfterSync + LenBytes + Len + 4, + case BufSize < FrameEnd of + true -> + <<_:Pos/binary, Kept/binary>> = Buffer, + {need_more, Kept}; + false -> + {_, AfterSyncBin} = split_binary(Buffer, AfterSync), + {LenAndPayloadBin, CrcAndRest} = split_binary( + AfterSyncBin, LenBytes + Len + ), + <> = CrcAndRest, + case erlang:crc32(LenAndPayloadBin) of + WireCRC -> + {_, Payload} = split_binary( + LenAndPayloadBin, LenBytes + ), + {ok, Payload, Rest}; + _ -> + {crc_error, Rest} + end + end + end + end + end + end. + +%% Keep a trailing 16#AA byte that might be the start of a sync marker. +keep_trailing_sync(Buffer, SearchFrom) -> + BufSize = byte_size(Buffer), + case BufSize > SearchFrom of + true -> + SkipBytes = BufSize - 1, + case Buffer of + <<_:SkipBytes/binary, 16#AA>> -> <<16#AA>>; + _ -> <<>> + end; + false -> + <<>> + end. + +send_preamble(UartMod, Uart) -> + UartMod:write(Uart, binary:copy(?SYNC_MAGIC, ?PREAMBLE_COUNT)). diff --git a/tests/libs/estdlib/CMakeLists.txt b/tests/libs/estdlib/CMakeLists.txt index b4b396a527..9e70b42d51 100644 --- a/tests/libs/estdlib/CMakeLists.txt +++ b/tests/libs/estdlib/CMakeLists.txt @@ -41,7 +41,13 @@ set(ERLANG_MODULES test_net test_net_kernel test_os + test_serial_dist + test_serial_dist_socat + test_serial_dist_socat_peer + test_serial_dist_beam_peer test_uart + mock_uart_hal + file_uart_hal test_proc_lib test_sets test_spawn @@ -61,5 +67,68 @@ set(ERLANG_MODULES ping_pong_server ) -pack_archive(test_estdlib_lib ${ERLANG_MODULES}) +pack_archive(test_estdlib_lib DEPENDS_ON eavmlib MODULES ${ERLANG_MODULES}) pack_test(test_estdlib estdlib eavmlib avm_network avm_unix etest) + +# Build .beam files for the BEAM peer used in BEAM<->AtomVM serial +# distribution tests. Only modules that AtomVM provides and OTP does +# not are placed here; OTP's own gen_server, dist_util, etc. are used +# as-is. serial_dist_controller is compiled with -DBEAM_INTERFACE so +# that recv returns lists (as OTP's dist_util expects). +add_custom_command( + OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/beam_beams/serial_dist_controller.beam + ${CMAKE_CURRENT_BINARY_DIR}/beam_beams/serial_dist.beam + COMMAND mkdir -p ${CMAKE_CURRENT_BINARY_DIR}/beam_beams + && erlc +debug_info -DBEAM_INTERFACE + -I ${CMAKE_SOURCE_DIR}/libs/include + -o ${CMAKE_CURRENT_BINARY_DIR}/beam_beams + ${CMAKE_SOURCE_DIR}/libs/estdlib/src/serial_dist_controller.erl + && ${CMAKE_COMMAND} -E copy + ${CMAKE_BINARY_DIR}/libs/estdlib/src/beams/serial_dist.beam + ${CMAKE_CURRENT_BINARY_DIR}/beam_beams/serial_dist.beam + DEPENDS ${CMAKE_SOURCE_DIR}/libs/estdlib/src/serial_dist_controller.erl + estdlib + COMMENT "Building BEAM peer modules for serial distribution tests" + VERBATIM +) +add_custom_target( + serial_dist_controller_beam ALL + DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/beam_beams/serial_dist_controller.beam + ${CMAKE_CURRENT_BINARY_DIR}/beam_beams/serial_dist.beam +) + +# Build a separate AVM for the serial distribution peer node +if(AVM_DISABLE_JIT) + set(peer_precompiled_suffix "") + set(peer_jit_archives "") + set(peer_jit_targets "") +else() + set(peer_precompiled_suffix "-${AVM_JIT_TARGET_ARCH}") + set(peer_jit_archives ${CMAKE_BINARY_DIR}/libs/jit/src/jit${peer_precompiled_suffix}.avm) + set(peer_jit_targets jit) +endif() + +add_custom_command( + OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/test_serial_dist_socat_peer.avm + DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/test_estdlib_lib.avm + ${CMAKE_BINARY_DIR}/libs/estdlib/src/estdlib${peer_precompiled_suffix}.avm + ${CMAKE_BINARY_DIR}/libs/eavmlib/src/eavmlib.avm + ${CMAKE_BINARY_DIR}/libs/avm_unix/src/avm_unix.avm + ${peer_jit_archives} + test_estdlib_lib estdlib eavmlib avm_unix ${peer_jit_targets} PackBEAM + COMMAND ${CMAKE_BINARY_DIR}/tools/packbeam/packbeam create + -s test_serial_dist_socat_peer + ${CMAKE_CURRENT_BINARY_DIR}/test_serial_dist_socat_peer.avm + ${CMAKE_CURRENT_BINARY_DIR}/test_estdlib_lib.avm + ${peer_jit_archives} + ${CMAKE_BINARY_DIR}/libs/estdlib/src/estdlib${peer_precompiled_suffix}.avm + ${CMAKE_BINARY_DIR}/libs/eavmlib/src/eavmlib.avm + ${CMAKE_BINARY_DIR}/libs/avm_unix/src/avm_unix.avm + COMMENT "Packing peer AVM for serial distribution tests" + VERBATIM +) +add_custom_target( + test_serial_dist_socat_peer ALL + DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/test_serial_dist_socat_peer.avm +) + diff --git a/tests/libs/estdlib/file_uart_hal.erl b/tests/libs/estdlib/file_uart_hal.erl new file mode 100644 index 0000000000..3131ef6ae5 --- /dev/null +++ b/tests/libs/estdlib/file_uart_hal.erl @@ -0,0 +1,125 @@ +% +% This file is part of AtomVM. +% +% Copyright 2026 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later + +%%----------------------------------------------------------------------------- +%% @doc UART HAL for BEAM using an Erlang port and socat. +%% +%% This module provides a UART interface for BEAM by spawning a socat +%% process that bridges between stdin/stdout and a PTY device. Data +%% written to the port goes to the PTY; data arriving on the PTY is +%% delivered as port messages. +%% +%% Intended only for testing serial distribution over socat virtual +%% serial ports. On AtomVM, use the platform-specific `uart' module. +%% @end +%%----------------------------------------------------------------------------- +-module(file_uart_hal). + +-behaviour(uart_hal). +-behaviour(gen_server). + +-export([open/1, open/2, close/1, read/1, read/2, write/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +-record(state, { + port :: port(), + buffer :: binary(), + waiter :: {term(), reference() | undefined} | undefined +}). + +-spec open(list()) -> pid() | {error, term()}. +open(Opts) -> + {ok, Pid} = gen_server:start(?MODULE, Opts, []), + Pid. + +-spec open(uart_hal:peripheral(), list()) -> pid() | {error, term()}. +open(_Name, Opts) -> + open(Opts). + +-spec close(pid()) -> ok. +close(Pid) -> + gen_server:stop(Pid). + +-spec read(pid()) -> {ok, binary()} | {error, term()}. +read(Pid) -> + gen_server:call(Pid, {read, infinity}, infinity). + +-spec read(pid(), pos_integer() | infinity) -> {ok, binary()} | {error, term()}. +read(Pid, infinity) -> + read(Pid); +read(Pid, Timeout) -> + gen_server:call(Pid, {read, Timeout}, Timeout + 5000). + +-spec write(pid(), iodata()) -> ok. +write(Pid, Data) -> + gen_server:call(Pid, {write, Data}, infinity). + +%% gen_server callbacks + +init(Opts) -> + Peripheral = proplists:get_value(peripheral, Opts), + Cmd = "socat -b 65536 - " ++ Peripheral ++ ",rawer", + Port = open_port({spawn, Cmd}, [binary, stream, exit_status]), + {ok, #state{port = Port, buffer = <<>>, waiter = undefined}}. + +handle_call({read, _}, _From, #state{buffer = Buffer} = State) when + byte_size(Buffer) > 0 +-> + {reply, {ok, Buffer}, State#state{buffer = <<>>}}; +handle_call({read, infinity}, From, #state{buffer = <<>>} = State) -> + {noreply, State#state{waiter = {From, undefined}}}; +handle_call({read, Timeout}, From, #state{buffer = <<>>} = State) -> + Ref = erlang:send_after(Timeout, self(), read_timeout), + {noreply, State#state{waiter = {From, Ref}}}; +handle_call({write, Data}, _From, #state{port = Port} = State) -> + port_command(Port, iolist_to_binary(Data)), + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({Port, {data, Data}}, #state{port = Port, waiter = undefined, buffer = Buffer} = State) -> + {noreply, State#state{buffer = <>}}; +handle_info( + {Port, {data, Data}}, #state{port = Port, waiter = {From, TimerRef}, buffer = Buffer} = State +) -> + cancel_timer(TimerRef), + gen_server:reply(From, {ok, <>}), + {noreply, State#state{buffer = <<>>, waiter = undefined}}; +handle_info(read_timeout, #state{waiter = {From, _}} = State) -> + gen_server:reply(From, {error, timeout}), + {noreply, State#state{waiter = undefined}}; +handle_info(read_timeout, State) -> + {noreply, State}; +handle_info({Port, {exit_status, _}}, #state{port = Port, waiter = Waiter} = State) -> + case Waiter of + {From, TimerRef} -> + cancel_timer(TimerRef), + gen_server:reply(From, {error, port_closed}); + undefined -> + ok + end, + {stop, port_closed, State#state{waiter = undefined}}. + +terminate(_Reason, #state{port = Port}) -> + catch port_close(Port), + ok. + +cancel_timer(undefined) -> ok; +cancel_timer(Ref) -> erlang:cancel_timer(Ref). diff --git a/tests/libs/estdlib/mock_uart_hal.erl b/tests/libs/estdlib/mock_uart_hal.erl new file mode 100644 index 0000000000..c9e5ef769e --- /dev/null +++ b/tests/libs/estdlib/mock_uart_hal.erl @@ -0,0 +1,138 @@ +% +% This file is part of AtomVM. +% +% Copyright 2026 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later + +%%----------------------------------------------------------------------------- +%% @doc Mock UART HAL for testing serial distribution without hardware. +%% +%% Creates pairs of connected virtual UART endpoints. Data written to +%% one endpoint becomes readable on the other, simulating a serial link. +%% +%% ``` +%% {A, B} = mock_uart_hal:create_pair(), +%% ok = mock_uart_hal:write(A, <<"hello">>), +%% {ok, <<"hello">>} = mock_uart_hal:read(B). +%% ''' +%% @end +%%----------------------------------------------------------------------------- +-module(mock_uart_hal). + +-behaviour(uart_hal). +-behaviour(gen_server). + +-export([create_pair/0, open/1, open/2, close/1, read/1, read/2, write/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +-record(state, { + peer :: pid() | undefined, + buffer :: binary(), + waiter :: {term(), reference() | undefined} | undefined +}). + +%% @doc Create a pair of connected virtual UART endpoints. +%% Data written to the first pid is readable from the second, and vice versa. +-spec create_pair() -> {pid(), pid()}. +create_pair() -> + {ok, A} = gen_server:start(?MODULE, [], []), + {ok, B} = gen_server:start(?MODULE, [], []), + ok = gen_server:call(A, {set_peer, B}), + ok = gen_server:call(B, {set_peer, A}), + {A, B}. + +%% @doc Open a mock UART endpoint. +%% Accepts `{mock_pid, Pid}' in Opts where Pid is an endpoint +%% previously returned by {@link create_pair/0}. +-spec open(proplists:proplist()) -> pid() | {error, term()}. +open(Opts) -> + case proplists:get_value(mock_pid, Opts) of + undefined -> {error, no_mock_pid}; + Pid when is_pid(Pid) -> Pid + end. + +%% @doc Open with peripheral name (ignored for mock). +-spec open(term(), proplists:proplist()) -> pid() | {error, term()}. +open(_Name, Opts) -> + open(Opts). + +-spec close(pid()) -> ok. +close(Pid) -> + gen_server:stop(Pid). + +%% @doc Blocking read — waits until data is available. +%% This matches the behavior expected by serial_dist_controller's +%% reader_loop, which calls read/1 in a tight loop. +-spec read(pid()) -> {ok, binary()} | {error, term()}. +read(Pid) -> + gen_server:call(Pid, {read, infinity}, infinity). + +%% @doc Read with timeout in milliseconds. +%% Returns `{error, timeout}' if no data arrives within the timeout. +-spec read(pid(), pos_integer() | infinity) -> {ok, binary()} | {error, term()}. +read(Pid, infinity) -> + read(Pid); +read(Pid, Timeout) -> + gen_server:call(Pid, {read, Timeout}, Timeout + 5000). + +%% @doc Write data to the UART. Data becomes readable on the peer endpoint. +-spec write(pid(), iodata()) -> ok. +write(Pid, Data) -> + gen_server:call(Pid, {write, iolist_to_binary(Data)}). + +%% gen_server callbacks + +init([]) -> + {ok, #state{peer = undefined, buffer = <<>>, waiter = undefined}}. + +handle_call({set_peer, Peer}, _From, State) -> + {reply, ok, State#state{peer = Peer}}; +handle_call({read, _Timeout}, _From, #state{buffer = Buffer} = State) when + byte_size(Buffer) > 0 +-> + {reply, {ok, Buffer}, State#state{buffer = <<>>}}; +handle_call({read, infinity}, From, #state{buffer = <<>>} = State) -> + {noreply, State#state{waiter = {From, undefined}}}; +handle_call({read, Timeout}, From, #state{buffer = <<>>} = State) -> + Ref = erlang:send_after(Timeout, self(), read_timeout), + {noreply, State#state{waiter = {From, Ref}}}; +handle_call({write, Data}, _From, #state{peer = Peer} = State) -> + Peer ! {mock_uart_data, Data}, + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({mock_uart_data, Data}, #state{waiter = undefined, buffer = Buffer} = State) -> + {noreply, State#state{buffer = <>}}; +handle_info({mock_uart_data, Data}, #state{waiter = {From, TimerRef}, buffer = Buffer} = State) -> + cancel_timer(TimerRef), + gen_server:reply(From, {ok, <>}), + {noreply, State#state{buffer = <<>>, waiter = undefined}}; +handle_info(read_timeout, #state{waiter = {From, _}} = State) -> + gen_server:reply(From, {error, timeout}), + {noreply, State#state{waiter = undefined}}; +handle_info(read_timeout, State) -> + %% Stale timer after data arrived — ignore + {noreply, State}; +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +cancel_timer(undefined) -> ok; +cancel_timer(Ref) -> erlang:cancel_timer(Ref). diff --git a/tests/libs/estdlib/test_serial_dist.erl b/tests/libs/estdlib/test_serial_dist.erl new file mode 100644 index 0000000000..2e55143512 --- /dev/null +++ b/tests/libs/estdlib/test_serial_dist.erl @@ -0,0 +1,434 @@ +% +% This file is part of AtomVM. +% +% Copyright 2026 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later + +-module(test_serial_dist). + +-export([test/0, start/0]). + +start() -> + test(). + +test() -> + ok = test_mock_uart_roundtrip(), + ok = test_mock_uart_read_timeout(), + ok = test_mock_uart_bidirectional(), + ok = test_mock_uart_buffering(), + ok = test_mock_uart_concurrent_write_during_read(), + ok = test_scan_frame_valid_at_start(), + ok = test_scan_frame_valid_after_garbage(), + ok = test_scan_frame_false_sync_large_length(), + ok = test_scan_frame_crc_error(), + ok = test_scan_frame_empty_buffer(), + ok = test_scan_frame_only_sync_markers(), + ok = test_scan_frame_trailing_aa(), + ok = test_scan_frame_partial_length(), + ok = test_scan_frame_partial_payload(), + ok = test_scan_frame_empty_payload(), + ok = test_scan_frame_32bit_mode(), + ok = test_scan_frame_multiple_frames(), + ok = test_scan_frame_max_handshake_boundary(), + case erlang:system_info(machine) of + "BEAM" -> + %% Controller tests use mock_uart_hal which requires AtomVM + io:format("test_serial_dist: skipping controller tests on BEAM~n"), + ok; + _ -> + ok = test_controller_handshake_send_framing(), + ok = test_controller_handshake_recv(), + ok = test_controller_handshake_recv_fragmented(), + ok = test_controller_roundtrip(), + ok = test_controller_tick(), + ok = test_controller_getstat(), + ok = test_controller_initial_data(), + ok + end. + +%%-------------------------------------------------------------------- +%% Mock UART HAL tests +%%-------------------------------------------------------------------- + +test_mock_uart_roundtrip() -> + {A, B} = mock_uart_hal:create_pair(), + ok = mock_uart_hal:write(A, <<"hello">>), + {ok, <<"hello">>} = mock_uart_hal:read(B), + ok = mock_uart_hal:write(B, <<"world">>), + {ok, <<"world">>} = mock_uart_hal:read(A), + mock_uart_hal:close(A), + mock_uart_hal:close(B), + ok. + +test_mock_uart_read_timeout() -> + {A, B} = mock_uart_hal:create_pair(), + {error, timeout} = mock_uart_hal:read(B, 100), + %% Verify the endpoint is still functional after a timeout + ok = mock_uart_hal:write(A, <<"after_timeout">>), + {ok, <<"after_timeout">>} = mock_uart_hal:read(B, 1000), + mock_uart_hal:close(A), + mock_uart_hal:close(B), + ok. + +test_mock_uart_bidirectional() -> + {A, B} = mock_uart_hal:create_pair(), + ok = mock_uart_hal:write(A, <<"from_a">>), + ok = mock_uart_hal:write(B, <<"from_b">>), + {ok, <<"from_b">>} = mock_uart_hal:read(A), + {ok, <<"from_a">>} = mock_uart_hal:read(B), + mock_uart_hal:close(A), + mock_uart_hal:close(B), + ok. + +test_mock_uart_buffering() -> + {A, B} = mock_uart_hal:create_pair(), + %% Multiple writes before a read should concatenate + ok = mock_uart_hal:write(A, <<"hel">>), + ok = mock_uart_hal:write(A, <<"lo">>), + %% Small delay to let both messages be processed by B's gen_server + receive + after 50 -> ok + end, + {ok, <<"hello">>} = mock_uart_hal:read(B), + mock_uart_hal:close(A), + mock_uart_hal:close(B), + ok. + +test_mock_uart_concurrent_write_during_read() -> + %% Verify that a write completes while a read is pending on the peer. + %% This is critical: serial_dist_controller's reader_loop blocks on + %% read/1 while the gen_server must still accept write/2 calls. + {A, B} = mock_uart_hal:create_pair(), + Parent = self(), + %% Start a read that will block (no data yet) + spawn_link(fun() -> + {ok, Data} = mock_uart_hal:read(B), + Parent ! {read_result, Data} + end), + %% Give the reader time to block + receive + after 50 -> ok + end, + %% Write should succeed even though a read is pending on the peer + ok = mock_uart_hal:write(A, <<"concurrent">>), + receive + {read_result, <<"concurrent">>} -> ok + after 5000 -> + exit(concurrent_write_timeout) + end, + mock_uart_hal:close(A), + mock_uart_hal:close(B), + ok. + +%%-------------------------------------------------------------------- +%% scan_frame unit tests +%%-------------------------------------------------------------------- + +test_scan_frame_valid_at_start() -> + %% Complete valid frame at the start of buffer + Frame = make_handshake_frame(<<"hello">>), + {ok, <<"hello">>, <<>>} = serial_dist_controller:scan_frame(Frame, 16), + ok. + +test_scan_frame_valid_after_garbage() -> + %% Valid frame preceded by garbage bytes + Frame = make_handshake_frame(<<"data">>), + Buffer = <<1, 2, 3, 4, Frame/binary>>, + {ok, <<"data">>, <<>>} = serial_dist_controller:scan_frame(Buffer, 16), + ok. + +test_scan_frame_false_sync_large_length() -> + %% False sync marker followed by a length that exceeds max frame size + %% should be skipped; the real frame follows. + RealFrame = make_handshake_frame(<<"ok">>), + %% 16#FF00 > MAX_HANDSHAKE_FRAME_SIZE (8192), so this is a false sync + FalseSync = <<16#AA, 16#55, 16#FF, 16#00>>, + Buffer = <>, + {ok, <<"ok">>, <<>>} = serial_dist_controller:scan_frame(Buffer, 16), + ok. + +test_scan_frame_crc_error() -> + %% Valid frame structure but corrupted payload -> CRC mismatch + Payload = <<"test">>, + Len = byte_size(Payload), + LenAndPayload = <>, + BadCRC = erlang:crc32(LenAndPayload) bxor 16#FFFFFFFF, + Buffer = <<16#AA, 16#55, LenAndPayload/binary, BadCRC:32>>, + {crc_error, <<>>} = serial_dist_controller:scan_frame(Buffer, 16), + ok. + +test_scan_frame_empty_buffer() -> + {need_more, <<>>} = serial_dist_controller:scan_frame(<<>>, 16), + ok. + +test_scan_frame_only_sync_markers() -> + %% Buffer containing only sync markers -- these are idle sync frames + %% from the link manager, not real packets. + Buffer = <<16#AA, 16#55, 16#AA, 16#55>>, + %% The scanner will find sync at pos 0, read the 2-byte length + %% field as <<16#AA, 16#55>> = 43605 which exceeds max handshake + %% frame size, so it skips. Then finds sync at pos 2, but not + %% enough bytes for a length field. + {need_more, <<16#AA, 16#55>>} = serial_dist_controller:scan_frame(Buffer, 16), + ok. + +test_scan_frame_trailing_aa() -> + %% A single trailing 0xAA that might be the start of a sync marker + %% should be kept in the buffer for the next read. + {need_more, <<16#AA>>} = serial_dist_controller:scan_frame(<<16#AA>>, 16), + ok. + +test_scan_frame_partial_length() -> + %% Sync marker present but not enough bytes for the length field + Buffer = <<16#AA, 16#55, 16#00>>, + {need_more, <<16#AA, 16#55, 16#00>>} = serial_dist_controller:scan_frame(Buffer, 16), + ok. + +test_scan_frame_partial_payload() -> + %% Sync + length present but payload is incomplete + Buffer = <<16#AA, 16#55, 5:16, "hel">>, + %% Length = 5, but only 3 bytes of payload present (no CRC either) + {need_more, <<16#AA, 16#55, 5:16, "hel">>} = + serial_dist_controller:scan_frame(Buffer, 16), + ok. + +test_scan_frame_empty_payload() -> + %% Frame with zero-length payload (used for handshake edge cases) + Frame = make_handshake_frame(<<>>), + {ok, <<>>, <<>>} = serial_dist_controller:scan_frame(Frame, 16), + ok. + +test_scan_frame_32bit_mode() -> + %% Data-phase frame with 32-bit length prefix + Payload = <<"data_phase">>, + Len = byte_size(Payload), + LenAndPayload = <>, + CRC = erlang:crc32(LenAndPayload), + Frame = <<16#AA, 16#55, LenAndPayload/binary, CRC:32>>, + {ok, <<"data_phase">>, <<>>} = serial_dist_controller:scan_frame(Frame, 32), + ok. + +test_scan_frame_multiple_frames() -> + %% Two complete frames concatenated; scan_frame returns the first + %% and the rest contains the second. + Frame1 = make_handshake_frame(<<"first">>), + Frame2 = make_handshake_frame(<<"second">>), + Buffer = <>, + {ok, <<"first">>, Rest} = serial_dist_controller:scan_frame(Buffer, 16), + {ok, <<"second">>, <<>>} = serial_dist_controller:scan_frame(Rest, 16), + ok. + +test_scan_frame_max_handshake_boundary() -> + %% Frame with length exactly at MAX_HANDSHAKE_FRAME_SIZE (8192) should + %% be accepted; length at 8193 should be treated as false sync. + %% We don't actually build an 8192-byte payload (too large for test), + %% but we can verify that a length of 8193 is rejected as false sync. + LenAndPayload = <<8193:16, 0>>, + CRC = erlang:crc32(LenAndPayload), + Buffer = <<16#AA, 16#55, LenAndPayload/binary, CRC:32>>, + %% 8193 > 8192 so this sync is treated as false; scanner looks for + %% another sync marker and finds none. + {need_more, _} = serial_dist_controller:scan_frame(Buffer, 16), + ok. + +%%-------------------------------------------------------------------- +%% Controller handshake-phase tests +%%-------------------------------------------------------------------- + +test_controller_handshake_send_framing() -> + %% Verify that send/2 produces a framed packet: + %% <<16#AA, 16#55, Len:16, Payload, CRC32:32>> + {A, B} = mock_uart_hal:create_pair(), + {ok, Ctrl} = serial_dist_controller:start(A, mock_uart_hal), + ok = serial_dist_controller:send(Ctrl, <<"hello">>), + {ok, Raw} = mock_uart_hal:read(B), + Expected = make_handshake_frame(<<"hello">>), + Expected = Raw, + %% Test with empty payload + ok = serial_dist_controller:send(Ctrl, <<>>), + {ok, Raw2} = mock_uart_hal:read(B), + Expected2 = make_handshake_frame(<<>>), + Expected2 = Raw2, + stop_controller(Ctrl), + mock_uart_hal:close(A), + mock_uart_hal:close(B), + ok. + +test_controller_handshake_recv() -> + %% Verify that recv/3 correctly parses a framed handshake packet + {A, B} = mock_uart_hal:create_pair(), + {ok, Ctrl} = serial_dist_controller:start(A, mock_uart_hal), + %% Write a properly framed handshake packet to the peer side + ok = mock_uart_hal:write(B, make_handshake_frame(<<"hello">>)), + {ok, Packet} = serial_dist_controller:recv(Ctrl, 0, 5000), + <<"hello">> = iolist_to_binary(Packet), + stop_controller(Ctrl), + mock_uart_hal:close(A), + mock_uart_hal:close(B), + ok. + +test_controller_handshake_recv_fragmented() -> + %% Verify that recv reassembles packets arriving in fragments. + %% This is the most common scenario on real UART links, especially + %% at low baud rates where OS/driver may deliver partial chunks. + {A, B} = mock_uart_hal:create_pair(), + {ok, Ctrl} = serial_dist_controller:start(A, mock_uart_hal), + Parent = self(), + Frame = make_handshake_frame(<<"hello">>), + %% Split the frame into 3 chunks + ChunkSize1 = 4, + ChunkSize2 = 3, + <> = Frame, + %% Start recv in a separate process (it will block) + spawn_link(fun() -> + {ok, Packet} = serial_dist_controller:recv(Ctrl, 0, 10000), + Parent ! {recv_result, iolist_to_binary(Packet)} + end), + %% Send first chunk (sync magic + partial length) + receive + after 50 -> ok + end, + ok = mock_uart_hal:write(B, Chunk1), + %% Send second chunk (rest of length + partial payload) + receive + after 50 -> ok + end, + ok = mock_uart_hal:write(B, Chunk2), + %% Send third chunk (rest of payload + CRC) + receive + after 50 -> ok + end, + ok = mock_uart_hal:write(B, Chunk3), + receive + {recv_result, <<"hello">>} -> ok + after 10000 -> + exit(fragmented_recv_timeout) + end, + stop_controller(Ctrl), + mock_uart_hal:close(A), + mock_uart_hal:close(B), + ok. + +test_controller_roundtrip() -> + %% Two controllers connected via mock UART: send from one, recv on other + {A, B} = mock_uart_hal:create_pair(), + {ok, CtrlA} = serial_dist_controller:start(A, mock_uart_hal), + {ok, CtrlB} = serial_dist_controller:start(B, mock_uart_hal), + %% A sends, B receives + ok = serial_dist_controller:send(CtrlA, <<"from_a">>), + {ok, PacketB} = serial_dist_controller:recv(CtrlB, 0, 5000), + <<"from_a">> = iolist_to_binary(PacketB), + %% B sends, A receives + ok = serial_dist_controller:send(CtrlB, <<"from_b">>), + {ok, PacketA} = serial_dist_controller:recv(CtrlA, 0, 5000), + <<"from_b">> = iolist_to_binary(PacketA), + %% Multiple packets in sequence + ok = serial_dist_controller:send(CtrlA, <<"msg1">>), + ok = serial_dist_controller:send(CtrlA, <<"msg2">>), + {ok, P1} = serial_dist_controller:recv(CtrlB, 0, 5000), + <<"msg1">> = iolist_to_binary(P1), + {ok, P2} = serial_dist_controller:recv(CtrlB, 0, 5000), + <<"msg2">> = iolist_to_binary(P2), + stop_controller(CtrlA), + stop_controller(CtrlB), + mock_uart_hal:close(A), + mock_uart_hal:close(B), + ok. + +test_controller_tick() -> + %% Verify tick sends a framed 4-byte zero (keepalive) + {A, B} = mock_uart_hal:create_pair(), + {ok, Ctrl} = serial_dist_controller:start(A, mock_uart_hal), + ok = serial_dist_controller:tick(Ctrl), + %% tick is a cast, give it a moment to process + receive + after 50 -> ok + end, + {ok, Raw} = mock_uart_hal:read(B, 1000), + Expected = make_frame(<<0:32>>), + Expected = Raw, + stop_controller(Ctrl), + mock_uart_hal:close(A), + mock_uart_hal:close(B), + ok. + +test_controller_getstat() -> + %% Verify sent/received counters + {A, B} = mock_uart_hal:create_pair(), + {ok, CtrlA} = serial_dist_controller:start(A, mock_uart_hal), + {ok, CtrlB} = serial_dist_controller:start(B, mock_uart_hal), + %% Initial stats: 0 sent, 0 received + {ok, 0, 0, 0} = serial_dist_controller:getstat(CtrlA), + %% Send two packets + ok = serial_dist_controller:send(CtrlA, <<"msg1">>), + ok = serial_dist_controller:send(CtrlA, <<"msg2">>), + {ok, 0, 2, 0} = serial_dist_controller:getstat(CtrlA), + %% Tick also increments sent counter + ok = serial_dist_controller:tick(CtrlA), + receive + after 50 -> ok + end, + {ok, 0, 3, 0} = serial_dist_controller:getstat(CtrlA), + %% Drain the packets on B side (recv doesn't increment received + %% counter — that only happens in data phase via process_recv_buffer) + {ok, _} = serial_dist_controller:recv(CtrlB, 0, 1000), + {ok, _} = serial_dist_controller:recv(CtrlB, 0, 1000), + stop_controller(CtrlA), + stop_controller(CtrlB), + mock_uart_hal:close(A), + mock_uart_hal:close(B), + ok. + +test_controller_initial_data() -> + %% Verify that initial data passed to start/3 is used for the first recv. + %% This simulates the accept path where the link manager reads the first + %% chunk from UART and passes it to the controller. + {A, B} = mock_uart_hal:create_pair(), + InitialData = make_handshake_frame(<<"hello">>), + {ok, Ctrl} = serial_dist_controller:start(A, mock_uart_hal, InitialData), + %% recv should return the packet from initial data without reading UART + {ok, Packet} = serial_dist_controller:recv(Ctrl, 0, 1000), + <<"hello">> = iolist_to_binary(Packet), + %% Subsequent recv should read from UART as normal + ok = mock_uart_hal:write(B, make_handshake_frame(<<"abc">>)), + {ok, Packet2} = serial_dist_controller:recv(Ctrl, 0, 5000), + <<"abc">> = iolist_to_binary(Packet2), + stop_controller(Ctrl), + mock_uart_hal:close(A), + mock_uart_hal:close(B), + ok. + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +%% Build a framed packet as the controller would send it on the wire. +%% Format: <<16#AA, 16#55, LenAndPayload/binary, CRC32:32>> +make_frame(LenAndPayload) -> + CRC = erlang:crc32(LenAndPayload), + <<16#AA, 16#55, LenAndPayload/binary, CRC:32>>. + +%% Build a handshake frame (16-bit length prefix). +make_handshake_frame(Payload) -> + Len = byte_size(Payload), + make_frame(<>). + +stop_controller(Ctrl) -> + unlink(Ctrl), + exit(Ctrl, shutdown), + receive + after 10 -> ok + end. diff --git a/tests/libs/estdlib/test_serial_dist_beam_peer.erl b/tests/libs/estdlib/test_serial_dist_beam_peer.erl new file mode 100644 index 0000000000..f5aa414662 --- /dev/null +++ b/tests/libs/estdlib/test_serial_dist_beam_peer.erl @@ -0,0 +1,44 @@ +% +% This file is part of AtomVM. +% +% Copyright 2026 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later + +%% @doc BEAM peer node for serial distribution tests. +%% +%% This module runs on BEAM (OTP). It is spawned by AtomVM's +%% test_serial_dist_socat to test BEAM<->AtomVM distribution over +%% socat virtual serial ports. +%% +%% Reads PTY_PATH and TEST_NAME from environment variables. +%% Uses OTP's native net_kernel with `-proto_dist serial' so that +%% serial_dist is used as the distribution protocol. UART options +%% are passed to serial_dist via application env. + +-module(test_serial_dist_beam_peer). + +-export([start/0]). + +start() -> + PtyPath = os:getenv("PTY_PATH"), + TestName = os:getenv("TEST_NAME"), + application:set_env(serial_dist, dist_opts, #{ + uart_opts => [{peripheral, PtyPath}, {speed, 115200}], + uart_module => file_uart_hal + }), + {ok, _} = net_kernel:start('beam_b@serial.local', #{name_domain => longnames}), + erlang:set_cookie('SerialTest'), + test_serial_dist_socat_peer:run_test('a@serial.local', TestName). diff --git a/tests/libs/estdlib/test_serial_dist_socat.erl b/tests/libs/estdlib/test_serial_dist_socat.erl new file mode 100644 index 0000000000..23714f6044 --- /dev/null +++ b/tests/libs/estdlib/test_serial_dist_socat.erl @@ -0,0 +1,385 @@ +% +% This file is part of AtomVM. +% +% Copyright 2026 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later + +%% @doc Tests for serial (UART) distribution over socat virtual serial ports. +%% +%% These tests create a socat pty pair, start a second AtomVM process as +%% the peer node, and verify distribution works over the serial link. +%% +%% Requires: socat, AtomVM binary in PATH or known location. + +-module(test_serial_dist_socat). + +-export([test/0, start/0]). + +start() -> + test(). + +test() -> + case erlang:system_info(machine) of + "ATOM" -> + case has_socat() andalso has_working_ptys() of + true -> + %% Pre-warm JIT compilation of modules used during + %% the distribution handshake so their compilation + %% doesn't eat into the handshake timer. + _ = crypto:module_info(), + _ = dist_util:module_info(), + _ = uart:module_info(), + _ = timer_manager:module_info(), + ok = test_ping_over_serial(), + ok = test_rpc_over_serial(), + ok = test_beam_ping_over_serial(), + ok = test_beam_rpc_over_serial(), + ok = test_multi_port_ping(), + ok; + false -> + io:format("test_serial_dist_socat: socat/ptys not available, skipping~n"), + ok + end; + _ -> + io:format("test_serial_dist_socat: skipping on BEAM~n"), + ok + end. + +has_socat() -> + try + {ok, _, Fd} = atomvm:subprocess( + "/bin/sh", ["sh", "-c", "command -v socat"], undefined, [stdout] + ), + Result = + case atomvm:posix_read(Fd, 200) of + eof -> false; + {ok, _} -> true + end, + ok = atomvm:posix_close(Fd), + Result + catch + _:_ -> false + end. + +%% Verify that socat ptys support termios (fails under qemu-user). +has_working_ptys() -> + try + {OsPid, SocatFd, PtyA, _PtyB} = start_socat(), + Result = + case atomvm:posix_open(PtyA, [o_rdwr, o_noctty]) of + {ok, Fd} -> + case atomvm:posix_tcgetattr(Fd) of + {ok, _} -> + atomvm:posix_close(Fd), + true; + {error, _} -> + atomvm:posix_close(Fd), + false + end; + {error, _} -> + false + end, + stop_socat(OsPid, SocatFd), + Result + catch + _:_ -> false + end. + +%%-------------------------------------------------------------------- +%% Test: ping over serial distribution +%%-------------------------------------------------------------------- + +test_ping_over_serial() -> run_serial_test(atomvm, "ping"). +test_rpc_over_serial() -> run_serial_test(atomvm, "rpc"). +test_beam_ping_over_serial() -> run_serial_test(beam, "ping"). +test_beam_rpc_over_serial() -> run_serial_test(beam, "rpc"). + +run_serial_test(PeerType, TestName) -> + {OsPid, SocatFd, PtyA, PtyB} = start_socat(), + try + {ok, _} = net_kernel:start('a@serial.local', #{ + name_domain => longnames, + proto_dist => serial_dist, + avm_dist_opts => #{ + uart_opts => [{peripheral, binary_to_list(PtyA)}, {speed, 115200}], + uart_module => uart + } + }), + erlang:set_cookie('SerialTest'), + register(test_serial, self()), + PeerFd = start_peer_node(PeerType, PtyB, TestName), + try + handle_peer_request(TestName, PeerFd), + Result = read_peer_line(PeerFd), + Expected = expected_result(TestName), + io:format("~p ~s result: ~s~n", [PeerType, TestName, Result]), + Expected = Result, + ok + after + catch unregister(test_serial), + atomvm:posix_close(PeerFd) + end + after + catch net_kernel:stop(), + stop_socat(OsPid, SocatFd) + end. + +start_peer_node(atomvm, PtyB, TestName) -> start_peer(PtyB, TestName); +start_peer_node(beam, PtyB, TestName) -> start_beam_peer(PtyB, TestName). + +handle_peer_request("ping", PeerFd) -> + receive + {PeerPid, ping} -> PeerPid ! {self(), pong} + after 30000 -> + drain_peer_output(PeerFd), + error(ping_timeout) + end; +handle_peer_request("rpc", PeerFd) -> + receive + {PeerPid, {apply, M, F, A}} -> + Result = apply(M, F, A), + PeerPid ! {self(), Result} + after 30000 -> + drain_peer_output(PeerFd), + error(rpc_timeout) + end. + +expected_result("ping") -> <<"pong">>; +expected_result("rpc") -> <<"ATOM">>. + +%%-------------------------------------------------------------------- +%% Test: multi-port ping (two peers via two UARTs) +%%-------------------------------------------------------------------- + +test_multi_port_ping() -> + %% Create two socat pairs: one for peer B, one for peer C. + %% Needs 4 PTYs; skip if the system cannot allocate them. + {OsPid1, SocatFd1, PtyA1, PtyB} = start_socat(), + try start_socat() of + {OsPid2, SocatFd2, PtyA2, PtyC} -> + test_multi_port_ping(OsPid1, SocatFd1, PtyA1, PtyB, OsPid2, SocatFd2, PtyA2, PtyC) + catch + _:_ -> + stop_socat(OsPid1, SocatFd1), + io:format("test_multi_port_ping: not enough PTYs, skipping~n"), + ok + end. + +test_multi_port_ping(OsPid1, SocatFd1, PtyA1, PtyB, OsPid2, SocatFd2, PtyA2, PtyC) -> + try + %% Start node A with two UART ports + {ok, _} = net_kernel:start('a@serial.local', #{ + name_domain => longnames, + proto_dist => serial_dist, + avm_dist_opts => #{ + uart_ports => [ + [{peripheral, binary_to_list(PtyA1)}, {speed, 115200}], + [{peripheral, binary_to_list(PtyA2)}, {speed, 115200}] + ], + uart_module => uart + } + }), + erlang:set_cookie('SerialTest'), + register(test_serial, self()), + %% Connect peers sequentially to avoid handshake timer + %% contention from simultaneous JIT compilation. + %% Peer B on first UART + PeerFdB = start_peer_node(atomvm, PtyB, "ping"), + handle_peer_request("ping", PeerFdB), + ResultB = read_peer_line(PeerFdB), + io:format("multi_port peer B result: ~s~n", [ResultB]), + <<"pong">> = ResultB, + %% Peer C on second UART (keep B alive to avoid disturbing A) + PeerFdC = start_peer_node(atomvm, PtyC, "ping"), + handle_peer_request("ping", PeerFdC), + ResultC = read_peer_line(PeerFdC), + io:format("multi_port peer C result: ~s~n", [ResultC]), + <<"pong">> = ResultC, + catch unregister(test_serial), + ok + after + catch net_kernel:stop(), + stop_socat(OsPid1, SocatFd1), + stop_socat(OsPid2, SocatFd2) + end. + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +start_socat() -> + {ok, OsPid, Fd} = atomvm:subprocess( + "/bin/sh", + ["sh", "-c", "socat -d -d pty,raw,echo=0 pty,raw,echo=0 2>&1"], + undefined, + [stdout] + ), + PtyA = extract_pty(read_line(Fd)), + PtyB = extract_pty(read_line(Fd)), + receive + after 200 -> ok + end, + {OsPid, Fd, PtyA, PtyB}. + +stop_socat(OsPid, Fd) -> + atomvm:posix_close(Fd), + {ok, _, KillFd} = atomvm:subprocess( + "/bin/kill", ["kill", integer_to_list(OsPid)], undefined, [stdout] + ), + atomvm:posix_close(KillFd). + +start_peer(PtyPath, TestName) -> + AvmBin = find_atomvm_binary(), + PeerAvm = find_peer_avm(), + Cmd = + "PTY_PATH=" ++ binary_to_list(PtyPath) ++ + " TEST_NAME=" ++ TestName ++ + " " ++ AvmBin ++ " " ++ PeerAvm ++ " 2>&1", + io:format("test_serial_dist_socat: peer cmd: ~s~n", [Cmd]), + {ok, _PeerPid, PeerFd} = atomvm:subprocess( + "/bin/sh", ["sh", "-c", Cmd], undefined, [stdout] + ), + PeerFd. + +start_beam_peer(PtyPath, TestName) -> + BeamBeams = find_beam_beams(), + TestBeams = find_test_beams(), + Cmd = + "PTY_PATH=" ++ binary_to_list(PtyPath) ++ + " TEST_NAME=" ++ TestName ++ + " erl -proto_dist serial -start_epmd false -kernel logger_level none" ++ + " -pa " ++ BeamBeams ++ " -pa " ++ TestBeams ++ + " -noshell -s test_serial_dist_beam_peer -s init stop 2>/dev/null", + io:format("test_serial_dist_socat: BEAM peer cmd: ~s~n", [Cmd]), + {ok, _PeerPid, PeerFd} = atomvm:subprocess( + "/bin/sh", ["sh", "-c", Cmd], undefined, [stdout] + ), + PeerFd. + +find_beam_beams() -> + case atomvm:posix_stat("./tests/libs/estdlib/beam_beams") of + {ok, _} -> "./tests/libs/estdlib/beam_beams"; + _ -> "tests/libs/estdlib/beam_beams" + end. + +find_test_beams() -> + case atomvm:posix_stat("./tests/libs/estdlib/beams") of + {ok, _} -> "./tests/libs/estdlib/beams"; + _ -> "tests/libs/estdlib/beams" + end. + +find_atomvm_binary() -> + case atomvm:posix_stat("./src/AtomVM") of + {ok, _} -> + "./src/AtomVM"; + _ -> + case atomvm:posix_stat("./AtomVM") of + {ok, _} -> "./AtomVM"; + _ -> "AtomVM" + end + end. + +find_peer_avm() -> + case atomvm:posix_stat("./tests/libs/estdlib/test_serial_dist_socat_peer.avm") of + {ok, _} -> "./tests/libs/estdlib/test_serial_dist_socat_peer.avm"; + _ -> "test_serial_dist_socat_peer.avm" + end. + +read_line(Fd) -> + read_line(Fd, <<>>). + +read_line(Fd, Acc) -> + case atomvm:posix_read(Fd, 1) of + {ok, <<$\n>>} -> + Acc; + {ok, Byte} -> + read_line(Fd, <>); + {error, eagain} -> + ok = atomvm:posix_select_read(Fd, self(), undefined), + receive + {select, _FdRes, undefined, ready_input} -> ok + after 5000 -> + exit(socat_read_timeout) + end, + read_line(Fd, Acc) + end. + +read_peer_line(Fd) -> + Line = read_peer_line(Fd, <<>>), + %% Skip JIT compilation output, OTP reports, and other diagnostic lines + case Line of + <<"Compilation of ", _/binary>> -> read_peer_line(Fd); + <<"+Compilation of ", _/binary>> -> read_peer_line(Fd); + <<"Unable to open ", _/binary>> -> read_peer_line(Fd); + <<"Failed load module", _/binary>> -> read_peer_line(Fd); + <<"Warning", _/binary>> -> read_peer_line(Fd); + <<"=", _/binary>> -> read_peer_line(Fd); + <<" ", _/binary>> -> read_peer_line(Fd); + <<>> -> read_peer_line(Fd); + _ -> Line + end. + +read_peer_line(Fd, Acc) -> + case atomvm:posix_read(Fd, 1) of + {ok, <<$\n>>} -> + Acc; + {ok, <<$\r>>} -> + read_peer_line(Fd, Acc); + {ok, Byte} -> + read_peer_line(Fd, <>); + {error, eagain} -> + ok = atomvm:posix_select_read(Fd, self(), undefined), + receive + {select, _FdRes, undefined, ready_input} -> ok + after 30000 -> + exit({peer_read_timeout, got_so_far, Acc}) + end, + read_peer_line(Fd, Acc); + eof -> + Acc + end. + +drain_peer_output(Fd) -> + case atomvm:posix_read(Fd, 4096) of + {ok, Data} -> + io:format("test_serial_dist_socat: peer output: ~s~n", [Data]), + drain_peer_output(Fd); + {error, eagain} -> + ok = atomvm:posix_select_read(Fd, self(), undefined), + receive + {select, _FdRes, undefined, ready_input} -> + case atomvm:posix_read(Fd, 4096) of + {ok, Data} -> + io:format("test_serial_dist_socat: peer output: ~s~n", [Data]); + _ -> + ok + end + after 2000 -> + io:format("test_serial_dist_socat: no more peer output~n") + end; + eof -> + io:format("test_serial_dist_socat: peer EOF~n"); + {error, Reason} -> + io:format("test_serial_dist_socat: peer read error: ~p~n", [Reason]) + end. + +extract_pty(Line) -> + case binary:match(Line, <<"PTY is ">>) of + {Pos, Len} -> + binary:part(Line, Pos + Len, byte_size(Line) - Pos - Len); + nomatch -> + exit({unexpected_socat_output, Line}) + end. diff --git a/tests/libs/estdlib/test_serial_dist_socat_peer.erl b/tests/libs/estdlib/test_serial_dist_socat_peer.erl new file mode 100644 index 0000000000..3f0895ab6f --- /dev/null +++ b/tests/libs/estdlib/test_serial_dist_socat_peer.erl @@ -0,0 +1,72 @@ +% +% This file is part of AtomVM. +% +% Copyright 2026 Paul Guyot +% +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% See the License for the specific language governing permissions and +% limitations under the License. +% +% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later + +%% @doc Peer node for serial distribution tests. +%% Reads PTY_PATH and TEST_NAME from environment variables. +%% Derives a unique node name from the PTY device path so that +%% multiple peers can connect to the same orchestrator simultaneously. + +-module(test_serial_dist_socat_peer). + +-export([start/0, run_test/2]). + +start() -> + PtyPath = os:getenv("PTY_PATH"), + TestName = os:getenv("TEST_NAME"), + %% Derive node name from PTY, e.g. "/dev/ttys035" -> 'ttys035@serial.local' + PtyBase = lists:last(string:split(PtyPath, "/", all)), + NodeName = list_to_atom(PtyBase ++ "@serial.local"), + %% Force JIT compilation of modules used during handshake + %% so their compilation doesn't eat into the handshake timer. + _ = crypto:module_info(), + _ = dist_util:module_info(), + _ = uart:module_info(), + _ = timer_manager:module_info(), + {ok, _} = net_kernel:start(NodeName, #{ + name_domain => longnames, + proto_dist => serial_dist, + avm_dist_opts => #{ + uart_opts => [{peripheral, PtyPath}, {speed, 115200}], + uart_module => uart + } + }), + erlang:set_cookie('SerialTest'), + run_test('a@serial.local', TestName). + +run_test(PeerNode, TestName) -> + case TestName of + "ping" -> + {test_serial, PeerNode} ! {self(), ping}, + receive + {_Pid, pong} -> + io:format("pong~n") + after 30000 -> + io:format("timeout~n"), + error(timeout) + end; + "rpc" -> + {test_serial, PeerNode} ! {self(), {apply, erlang, system_info, [machine]}}, + receive + {_Pid, Result} -> + io:format("~s~n", [Result]) + after 30000 -> + io:format("timeout~n"), + error(timeout) + end + end. diff --git a/tests/libs/estdlib/tests.erl b/tests/libs/estdlib/tests.erl index 1cdd8f9043..922dbe183a 100644 --- a/tests/libs/estdlib/tests.erl +++ b/tests/libs/estdlib/tests.erl @@ -80,7 +80,9 @@ get_non_networking_tests(_OTPVersion) -> test_os, test_file, test_filename, - test_uart + test_serial_dist, + test_uart, + test_serial_dist_socat ]. get_networking_tests() ->