Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions examples/file_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#!/usr/bin/env python3
"""
Read a file from a Cyphal/UDP file server and write it to stdout.
Usage:
python examples/file_client.py <file> > copy.bin
"""

from __future__ import annotations

import argparse
import asyncio
import logging
import os
import struct
import sys
from dataclasses import dataclass
from pathlib import Path

from pycyphal2 import DeliveryError, Instant, LivenessError, Node, ResponseStream, SendError
from pycyphal2.udp import UDPTransport

NAME = f"{Path(__file__).stem}/" # The trailing separator ensures that a random ID will be added.
TOPIC = "file/read"
PATH_MAX_LEN = 2048
DATA_MAX = 4096
RESPONSE_TIMEOUT = 30.0
REQUEST_DELIVERY_TIMEOUT = RESPONSE_TIMEOUT / 2.0
REQUEST_HEADER_FORMAT = "<QH"
RESPONSE_HEADER_FORMAT = "<IH"
RESPONSE_HEADER_SIZE = struct.calcsize(RESPONSE_HEADER_FORMAT)
Comment on lines +27 to +30

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These belong in the dataclasses and they should be private with exposed serialize/deserialize methods. Maybe we should even move them into a shared module to reduce duplication between the examples.


_logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class FileReadResponse:
error: int
data: bytes


def _encode_request(file_path: str, read_offset: int) -> bytes:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, broken encapsulation

encoded_path = file_path.encode("utf8")
if len(encoded_path) > PATH_MAX_LEN:
raise ValueError(f"File path length {len(encoded_path)} is too long")
return struct.pack(REQUEST_HEADER_FORMAT, read_offset, len(encoded_path)) + encoded_path


def _decode_response(payload: bytes) -> FileReadResponse | None:
if len(payload) < RESPONSE_HEADER_SIZE:
return None
error, data_len = struct.unpack_from(RESPONSE_HEADER_FORMAT, payload)
if data_len > DATA_MAX:
return None
data_start = RESPONSE_HEADER_SIZE
data_end = data_start + data_len
if len(payload) != data_end:
return None
return FileReadResponse(error=error, data=payload[data_start:data_end])


def _format_remote_error(error: int) -> str:
try:
message = os.strerror(error)
except (OverflowError, ValueError):
message = "unknown error"
return f"Remote error {error}: {message}"


async def _receive_response(stream: ResponseStream, expected_server_id: int | None) -> tuple[int, FileReadResponse]:
async for response in stream:
if expected_server_id is not None and response.remote_id != expected_server_id:
_logger.info(
"ignoring response from redundant server %016x, expected %016x",
response.remote_id,
expected_server_id,
)
continue
decoded = _decode_response(response.message)
if decoded is None:
_logger.debug("dropping malformed response from %016x seq=%d", response.remote_id, response.seqno)
continue
return response.remote_id, decoded
raise LivenessError("Response stream closed")


async def _receive_valid_response(
stream: ResponseStream, expected_server_id: int | None, response_timeout: float
) -> tuple[int, FileReadResponse]:
try:
return await asyncio.wait_for(_receive_response(stream, expected_server_id), timeout=response_timeout)
except asyncio.TimeoutError as ex:
raise LivenessError("Response timeout") from ex


async def run(file_path: str) -> int:
transport = UDPTransport.new()
node = Node.new(transport, NAME)
pub = node.advertise(TOPIC)
discovered_server_id: int | None = None
read_offset = 0
_logger.info("file client ready on %r via %s", TOPIC, transport)
try:
while True:
_logger.info("requesting offset %d", read_offset)
stream: ResponseStream | None = None
try:
request = _encode_request(file_path, read_offset)
stream = await pub.request(Instant.now() + REQUEST_DELIVERY_TIMEOUT, RESPONSE_TIMEOUT, request)
server_id, response = await _receive_valid_response(stream, discovered_server_id, RESPONSE_TIMEOUT)
except ValueError as ex:
sys.stderr.write(f"{ex}\n")
return 1
except DeliveryError:
sys.stderr.write("Request delivery failed\n")
return 1
except LivenessError:
sys.stderr.write("Response timeout\n")
return 1
except SendError as ex:
sys.stderr.write(f"Request send failed: {ex}\n")
return 1
finally:
if stream is not None:
stream.close()

if discovered_server_id is None:
discovered_server_id = server_id
_logger.info("discovered server UID: %016x", discovered_server_id)
_logger.info("received response: offset %d", read_offset)

if response.error != 0:
sys.stderr.write(_format_remote_error(response.error) + "\n")
return 1
if len(response.data) > 0:
sys.stdout.buffer.write(response.data)
sys.stdout.buffer.flush()
read_offset += len(response.data)
continue

_logger.info("finished transferring %d bytes", read_offset)
return 0
finally:
pub.close()
node.close()
transport.close()


def main() -> None:
parser = argparse.ArgumentParser(description="Read a file from a Cyphal/UDP file server.")
parser.add_argument("file", help="File path to read on the server")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable debug logging")
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO, format="%(levelname)s: %(message)s")
try:
raise SystemExit(asyncio.run(run(args.file)))
except KeyboardInterrupt:
pass


if __name__ == "__main__":
main()
150 changes: 150 additions & 0 deletions examples/file_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#!/usr/bin/env python3
"""
Serve file chunks over a tiny Cyphal/UDP RPC.
Usage:
python examples/file_server.py
"""

from __future__ import annotations

import asyncio
import errno
import logging
import struct
from dataclasses import dataclass
from functools import partial
from pathlib import Path

from pycyphal2 import Arrival, DeliveryError, NackError, Node, SendError
from pycyphal2.udp import UDPTransport

NAME = f"{Path(__file__).stem}/" # The trailing separator ensures that a random ID will be added.
TOPIC = "file/read"
PATH_MAX_LEN = 2048
DATA_MAX = 4096
RESPONSE_DEADLINE = 10.0
REQUEST_HEADER_FORMAT = "<QH"
REQUEST_HEADER_SIZE = struct.calcsize(REQUEST_HEADER_FORMAT)
RESPONSE_HEADER_FORMAT = "<IH"

_logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class FileReadRequest:
read_offset: int
file_path: str


@dataclass(frozen=True)
class FileReadResponse:
error: int
data: bytes


def _decode_request(payload: bytes) -> FileReadRequest | None:
if len(payload) < REQUEST_HEADER_SIZE:
return None
read_offset, path_len = struct.unpack_from(REQUEST_HEADER_FORMAT, payload)
if path_len == 0 or path_len > PATH_MAX_LEN:
return None
path_end = REQUEST_HEADER_SIZE + path_len
if len(payload) != path_end:
return None
try:
file_path = payload[REQUEST_HEADER_SIZE:path_end].decode("utf8")
except UnicodeDecodeError:
return None
return FileReadRequest(read_offset=read_offset, file_path=file_path)


def _encode_response(response: FileReadResponse) -> bytes:
if len(response.data) > DATA_MAX:
raise ValueError(f"Response data is too large: {len(response.data)}")
return struct.pack(RESPONSE_HEADER_FORMAT, response.error, len(response.data)) + response.data


def _errno_from_exception(ex: BaseException) -> int:
if isinstance(ex, OSError) and ex.errno is not None:
return ex.errno
if isinstance(ex, OverflowError):
return getattr(errno, "EOVERFLOW", errno.EINVAL)
return errno.EINVAL


def _read_chunk(file_path: str, offset: int) -> FileReadResponse:
try:
with open(file_path, "rb") as file:
file.seek(offset)
data = file.read(DATA_MAX)
except (OSError, ValueError, OverflowError) as ex:
return FileReadResponse(error=_errno_from_exception(ex), data=b"")
return FileReadResponse(error=0, data=data)


async def _serve_request(arrival: Arrival, request: FileReadRequest) -> None:
response = _read_chunk(request.file_path, request.read_offset)
payload = _encode_response(response)
_logger.info(
"responding: file=%r offset=%d size=%d error=%d",
request.file_path,
request.read_offset,
len(response.data),
response.error,
)
try:
await arrival.breadcrumb(arrival.timestamp + RESPONSE_DEADLINE, payload, reliable=True)
except NackError:
_logger.info("client rejected response: remote=%016x file=%r", arrival.breadcrumb.remote_id, request.file_path)
except DeliveryError:
_logger.info(
"client did not acknowledge: remote=%016x file=%r", arrival.breadcrumb.remote_id, request.file_path
)
except SendError as ex:
_logger.warning("response send failed: remote=%016x error=%s", arrival.breadcrumb.remote_id, ex)


def _on_task_done(tasks: set[asyncio.Task[None]], task: asyncio.Task[None]) -> None:
tasks.discard(task)
if task.cancelled():
return
exc = task.exception()
if exc is not None:
_logger.error("file request task failed: %s", exc)


async def run() -> None:
transport = UDPTransport.new()
node = Node.new(transport, NAME)
sub = node.subscribe(TOPIC)
tasks: set[asyncio.Task[None]] = set()
_logger.info("file server ready on %r via %s", TOPIC, transport)
try:
async for arrival in sub:
request = _decode_request(arrival.message)
if request is None:
_logger.debug("dropping malformed request of size %d", len(arrival.message))
continue
task = asyncio.create_task(_serve_request(arrival, request), name=f"file:{arrival.breadcrumb.tag}")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this example would benefit from doing it sequentially to focus more on Cyphal rather than asyncio.

tasks.add(task)
task.add_done_callback(partial(_on_task_done, tasks))
finally:
sub.close()
for task in list(tasks):
task.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
node.close()
transport.close()


def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
try:
asyncio.run(run())
except KeyboardInterrupt:
pass


if __name__ == "__main__":
main()
46 changes: 46 additions & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def examples(session: nox.Session) -> None:
import json as _json
import subprocess
import sys
import tempfile
import time

if sys.platform == "darwin":
Expand Down Expand Up @@ -145,9 +146,54 @@ def run_streaming_case() -> None:
assert "remaining" in obj
assert "sent_at" in obj

def run_file_case() -> None:
session.log("--- examples smoke: file RPC ---")
server_proc = None
client_proc = None
missing_proc = None
with tempfile.TemporaryDirectory() as tmp_dir:
file_path = Path(tmp_dir) / "source.bin"
payload = bytes(range(256)) * 20
file_path.write_bytes(payload)
try:
server_proc = subprocess.Popen(
[python, "examples/file_server.py"],
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
text=True,
)
time.sleep(1)
client_proc = subprocess.Popen(
[python, "examples/file_client.py", str(file_path)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
client_stdout, client_stderr = client_proc.communicate(timeout=30)
assert client_proc.returncode == 0, f"File client failed: {client_stderr.decode()}"
assert client_stdout == payload

missing_proc = subprocess.Popen(
[python, "examples/file_client.py", str(file_path.with_name("missing.bin"))],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
missing_stdout, missing_stderr = missing_proc.communicate(timeout=30)
assert missing_proc.returncode == 1
assert missing_stdout == ""
assert "Remote error" in missing_stderr

time.sleep(1)
assert server_proc.poll() is None, "File server exited unexpectedly"
finally:
terminate_process(client_proc)
terminate_process(missing_proc)
terminate_process(server_proc)

run_case("udp", [])
if sys.platform == "linux" and Path("/sys/class/net/vcan0").exists():
run_case("socketcan:vcan0", ["--transport", "socketcan:vcan0"])
else:
session.log("Skipping socketcan:vcan0 case (vcan0 not available)")
run_streaming_case()
run_file_case()
Loading
Loading