From 701e2f80479a62de0d6fabe16fcc99afe31ed402 Mon Sep 17 00:00:00 2001 From: Maksim Drachov Date: Mon, 29 Jun 2026 13:10:13 +0300 Subject: [PATCH] Codex implementes the file_client and file_server examples based on cy --- examples/file_client.py | 161 ++++++++++++++++++++++++++++++++++++++++ examples/file_server.py | 150 +++++++++++++++++++++++++++++++++++++ noxfile.py | 46 ++++++++++++ tests/test_examples.py | 47 ++++++++++++ 4 files changed, 404 insertions(+) create mode 100755 examples/file_client.py create mode 100755 examples/file_server.py create mode 100644 tests/test_examples.py diff --git a/examples/file_client.py b/examples/file_client.py new file mode 100755 index 00000000..8e8864e9 --- /dev/null +++ b/examples/file_client.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 +""" +Read a file from a Cyphal/UDP file server and write it to stdout. +Usage: + python examples/file_client.py > copy.bin +""" + +from __future__ import annotations + +import argparse +import asyncio +import logging +import os +import struct +import sys +from dataclasses import dataclass +from pathlib import Path + +from pycyphal2 import DeliveryError, Instant, LivenessError, Node, ResponseStream, SendError +from pycyphal2.udp import UDPTransport + +NAME = f"{Path(__file__).stem}/" # The trailing separator ensures that a random ID will be added. +TOPIC = "file/read" +PATH_MAX_LEN = 2048 +DATA_MAX = 4096 +RESPONSE_TIMEOUT = 30.0 +REQUEST_DELIVERY_TIMEOUT = RESPONSE_TIMEOUT / 2.0 +REQUEST_HEADER_FORMAT = " bytes: + encoded_path = file_path.encode("utf8") + if len(encoded_path) > PATH_MAX_LEN: + raise ValueError(f"File path length {len(encoded_path)} is too long") + return struct.pack(REQUEST_HEADER_FORMAT, read_offset, len(encoded_path)) + encoded_path + + +def _decode_response(payload: bytes) -> FileReadResponse | None: + if len(payload) < RESPONSE_HEADER_SIZE: + return None + error, data_len = struct.unpack_from(RESPONSE_HEADER_FORMAT, payload) + if data_len > DATA_MAX: + return None + data_start = RESPONSE_HEADER_SIZE + data_end = data_start + data_len + if len(payload) != data_end: + return None + return FileReadResponse(error=error, data=payload[data_start:data_end]) + + +def _format_remote_error(error: int) -> str: + try: + message = os.strerror(error) + except (OverflowError, ValueError): + message = "unknown error" + return f"Remote error {error}: {message}" + + +async def _receive_response(stream: ResponseStream, expected_server_id: int | None) -> tuple[int, FileReadResponse]: + async for response in stream: + if expected_server_id is not None and response.remote_id != expected_server_id: + _logger.info( + "ignoring response from redundant server %016x, expected %016x", + response.remote_id, + expected_server_id, + ) + continue + decoded = _decode_response(response.message) + if decoded is None: + _logger.debug("dropping malformed response from %016x seq=%d", response.remote_id, response.seqno) + continue + return response.remote_id, decoded + raise LivenessError("Response stream closed") + + +async def _receive_valid_response( + stream: ResponseStream, expected_server_id: int | None, response_timeout: float +) -> tuple[int, FileReadResponse]: + try: + return await asyncio.wait_for(_receive_response(stream, expected_server_id), timeout=response_timeout) + except asyncio.TimeoutError as ex: + raise LivenessError("Response timeout") from ex + + +async def run(file_path: str) -> int: + transport = UDPTransport.new() + node = Node.new(transport, NAME) + pub = node.advertise(TOPIC) + discovered_server_id: int | None = None + read_offset = 0 + _logger.info("file client ready on %r via %s", TOPIC, transport) + try: + while True: + _logger.info("requesting offset %d", read_offset) + stream: ResponseStream | None = None + try: + request = _encode_request(file_path, read_offset) + stream = await pub.request(Instant.now() + REQUEST_DELIVERY_TIMEOUT, RESPONSE_TIMEOUT, request) + server_id, response = await _receive_valid_response(stream, discovered_server_id, RESPONSE_TIMEOUT) + except ValueError as ex: + sys.stderr.write(f"{ex}\n") + return 1 + except DeliveryError: + sys.stderr.write("Request delivery failed\n") + return 1 + except LivenessError: + sys.stderr.write("Response timeout\n") + return 1 + except SendError as ex: + sys.stderr.write(f"Request send failed: {ex}\n") + return 1 + finally: + if stream is not None: + stream.close() + + if discovered_server_id is None: + discovered_server_id = server_id + _logger.info("discovered server UID: %016x", discovered_server_id) + _logger.info("received response: offset %d", read_offset) + + if response.error != 0: + sys.stderr.write(_format_remote_error(response.error) + "\n") + return 1 + if len(response.data) > 0: + sys.stdout.buffer.write(response.data) + sys.stdout.buffer.flush() + read_offset += len(response.data) + continue + + _logger.info("finished transferring %d bytes", read_offset) + return 0 + finally: + pub.close() + node.close() + transport.close() + + +def main() -> None: + parser = argparse.ArgumentParser(description="Read a file from a Cyphal/UDP file server.") + parser.add_argument("file", help="File path to read on the server") + parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging") + args = parser.parse_args() + logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO, format="%(levelname)s: %(message)s") + try: + raise SystemExit(asyncio.run(run(args.file))) + except KeyboardInterrupt: + pass + + +if __name__ == "__main__": + main() diff --git a/examples/file_server.py b/examples/file_server.py new file mode 100755 index 00000000..a0c74139 --- /dev/null +++ b/examples/file_server.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python3 +""" +Serve file chunks over a tiny Cyphal/UDP RPC. +Usage: + python examples/file_server.py +""" + +from __future__ import annotations + +import asyncio +import errno +import logging +import struct +from dataclasses import dataclass +from functools import partial +from pathlib import Path + +from pycyphal2 import Arrival, DeliveryError, NackError, Node, SendError +from pycyphal2.udp import UDPTransport + +NAME = f"{Path(__file__).stem}/" # The trailing separator ensures that a random ID will be added. +TOPIC = "file/read" +PATH_MAX_LEN = 2048 +DATA_MAX = 4096 +RESPONSE_DEADLINE = 10.0 +REQUEST_HEADER_FORMAT = " FileReadRequest | None: + if len(payload) < REQUEST_HEADER_SIZE: + return None + read_offset, path_len = struct.unpack_from(REQUEST_HEADER_FORMAT, payload) + if path_len == 0 or path_len > PATH_MAX_LEN: + return None + path_end = REQUEST_HEADER_SIZE + path_len + if len(payload) != path_end: + return None + try: + file_path = payload[REQUEST_HEADER_SIZE:path_end].decode("utf8") + except UnicodeDecodeError: + return None + return FileReadRequest(read_offset=read_offset, file_path=file_path) + + +def _encode_response(response: FileReadResponse) -> bytes: + if len(response.data) > DATA_MAX: + raise ValueError(f"Response data is too large: {len(response.data)}") + return struct.pack(RESPONSE_HEADER_FORMAT, response.error, len(response.data)) + response.data + + +def _errno_from_exception(ex: BaseException) -> int: + if isinstance(ex, OSError) and ex.errno is not None: + return ex.errno + if isinstance(ex, OverflowError): + return getattr(errno, "EOVERFLOW", errno.EINVAL) + return errno.EINVAL + + +def _read_chunk(file_path: str, offset: int) -> FileReadResponse: + try: + with open(file_path, "rb") as file: + file.seek(offset) + data = file.read(DATA_MAX) + except (OSError, ValueError, OverflowError) as ex: + return FileReadResponse(error=_errno_from_exception(ex), data=b"") + return FileReadResponse(error=0, data=data) + + +async def _serve_request(arrival: Arrival, request: FileReadRequest) -> None: + response = _read_chunk(request.file_path, request.read_offset) + payload = _encode_response(response) + _logger.info( + "responding: file=%r offset=%d size=%d error=%d", + request.file_path, + request.read_offset, + len(response.data), + response.error, + ) + try: + await arrival.breadcrumb(arrival.timestamp + RESPONSE_DEADLINE, payload, reliable=True) + except NackError: + _logger.info("client rejected response: remote=%016x file=%r", arrival.breadcrumb.remote_id, request.file_path) + except DeliveryError: + _logger.info( + "client did not acknowledge: remote=%016x file=%r", arrival.breadcrumb.remote_id, request.file_path + ) + except SendError as ex: + _logger.warning("response send failed: remote=%016x error=%s", arrival.breadcrumb.remote_id, ex) + + +def _on_task_done(tasks: set[asyncio.Task[None]], task: asyncio.Task[None]) -> None: + tasks.discard(task) + if task.cancelled(): + return + exc = task.exception() + if exc is not None: + _logger.error("file request task failed: %s", exc) + + +async def run() -> None: + transport = UDPTransport.new() + node = Node.new(transport, NAME) + sub = node.subscribe(TOPIC) + tasks: set[asyncio.Task[None]] = set() + _logger.info("file server ready on %r via %s", TOPIC, transport) + try: + async for arrival in sub: + request = _decode_request(arrival.message) + if request is None: + _logger.debug("dropping malformed request of size %d", len(arrival.message)) + continue + task = asyncio.create_task(_serve_request(arrival, request), name=f"file:{arrival.breadcrumb.tag}") + tasks.add(task) + task.add_done_callback(partial(_on_task_done, tasks)) + finally: + sub.close() + for task in list(tasks): + task.cancel() + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + node.close() + transport.close() + + +def main() -> None: + logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") + try: + asyncio.run(run()) + except KeyboardInterrupt: + pass + + +if __name__ == "__main__": + main() diff --git a/noxfile.py b/noxfile.py index 3adb2326..18003ec7 100644 --- a/noxfile.py +++ b/noxfile.py @@ -63,6 +63,7 @@ def examples(session: nox.Session) -> None: import json as _json import subprocess import sys + import tempfile import time if sys.platform == "darwin": @@ -145,9 +146,54 @@ def run_streaming_case() -> None: assert "remaining" in obj assert "sent_at" in obj + def run_file_case() -> None: + session.log("--- examples smoke: file RPC ---") + server_proc = None + client_proc = None + missing_proc = None + with tempfile.TemporaryDirectory() as tmp_dir: + file_path = Path(tmp_dir) / "source.bin" + payload = bytes(range(256)) * 20 + file_path.write_bytes(payload) + try: + server_proc = subprocess.Popen( + [python, "examples/file_server.py"], + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, + text=True, + ) + time.sleep(1) + client_proc = subprocess.Popen( + [python, "examples/file_client.py", str(file_path)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + client_stdout, client_stderr = client_proc.communicate(timeout=30) + assert client_proc.returncode == 0, f"File client failed: {client_stderr.decode()}" + assert client_stdout == payload + + missing_proc = subprocess.Popen( + [python, "examples/file_client.py", str(file_path.with_name("missing.bin"))], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + missing_stdout, missing_stderr = missing_proc.communicate(timeout=30) + assert missing_proc.returncode == 1 + assert missing_stdout == "" + assert "Remote error" in missing_stderr + + time.sleep(1) + assert server_proc.poll() is None, "File server exited unexpectedly" + finally: + terminate_process(client_proc) + terminate_process(missing_proc) + terminate_process(server_proc) + run_case("udp", []) if sys.platform == "linux" and Path("/sys/class/net/vcan0").exists(): run_case("socketcan:vcan0", ["--transport", "socketcan:vcan0"]) else: session.log("Skipping socketcan:vcan0 case (vcan0 not available)") run_streaming_case() + run_file_case() diff --git a/tests/test_examples.py b/tests/test_examples.py new file mode 100644 index 00000000..a5917fd5 --- /dev/null +++ b/tests/test_examples.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import asyncio +import errno +import struct + +import pytest + +from examples.file_client import _decode_response, _format_remote_error, _receive_valid_response +from examples.file_server import _decode_request, _read_chunk +from pycyphal2 import Instant, LivenessError, Response, ResponseStream + + +def test_file_example_decoders_reject_trailing_garbage() -> None: + assert _decode_request(struct.pack(" None: + assert _read_chunk("bad\0path", 0).error == errno.EINVAL + assert _read_chunk("pyproject.toml", (1 << 64) - 1).error != 0 + + +def test_file_client_formats_unknown_remote_error() -> None: + assert "4294967295" in _format_remote_error(0xFFFFFFFF) + + +class _MalformedResponseStream(ResponseStream): + def __init__(self) -> None: + self._seqno = 0 + + def __aiter__(self) -> _MalformedResponseStream: + return self + + async def __anext__(self) -> Response: + await asyncio.sleep(0) + seqno = self._seqno + self._seqno += 1 + return Response(timestamp=Instant.now(), remote_id=123, seqno=seqno, message=b"x") + + def close(self) -> None: + pass + + +async def test_file_client_absolute_response_timeout() -> None: + with pytest.raises(LivenessError): + await _receive_valid_response(_MalformedResponseStream(), None, 0.01)