Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions packages/jumpstarter-driver-pyserial/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,108 @@ export:
| check_present | Check if the serial port exists during exporter initialization, disable if you are connecting to a dynamically created port (i.e. USB from your DUT) | bool | no | True |
| cps | Characters per second throttling limit. When set, data transmission will be throttled to simulate slow typing. Useful for devices that can't handle fast input | float | no | None |

## NVDemuxSerial Driver

The `NVDemuxSerial` driver provides serial access to NVIDIA Tegra demultiplexed UART channels using the [nv_tcu_demuxer](https://docs.nvidia.com/jetson/archives/r38.2.1/DeveloperGuide/AT/JetsonLinuxDevelopmentTools/TegraCombinedUART.html) tool. It automatically handles device reconnection when the target device restarts.

The nv_tcu_demuxer tool can be obtained from the NVIDIA Jetson BSP, at this path: `Linux_for_Tegra/tools/demuxer/nv_tcu_demuxer`.

### Multi-Instance Support

Multiple driver instances can share a single demuxer process by specifying different target channels. This allows simultaneous access to multiple UART channels (CCPLEX, BPMP, SCE, etc.) from the same physical device.

### Configuration

#### Single channel example:

```yaml
export:
ccplex:
type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial
config:
demuxer_path: "/opt/nvidia/nv_tcu_demuxer"
# device defaults to auto-detect NVIDIA Tegra On-Platform Operator
# chip defaults to T264 (Thor), use T234 for Orin
```

#### Multiple channels example:

```yaml
export:
ccplex:
type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial
config:
demuxer_path: "/opt/nvidia/nv_tcu_demuxer"
target: "CCPLEX: 0"
chip: "T264"

bpmp:
type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial
config:
demuxer_path: "/opt/nvidia/nv_tcu_demuxer"
target: "BPMP: 1"
chip: "T264"

sce:
type: jumpstarter_driver_pyserial.nvdemux.driver.NVDemuxSerial
config:
demuxer_path: "/opt/nvidia/nv_tcu_demuxer"
target: "SCE: 2"
chip: "T264"
```

### Config parameters

| Parameter | Description | Type | Required | Default |
| -------------- | ----------------------------------------------------------------------------------------------- | ----- | -------- | ------------------------------------------------------------------------- |
| demuxer_path | Path to the `nv_tcu_demuxer` binary | str | yes | |
| device | Device path or glob pattern for auto-detection | str | no | `/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01` |
| target | Target channel to extract from demuxer output | str | no | `CCPLEX: 0` |
| chip | Chip type for demuxer (`T234` for Orin, `T264` for Thor) | str | no | `T264` |
| baudrate | Baud rate for the serial connection | int | no | 115200 |
| cps | Characters per second throttling limit | float | no | None |
| timeout | Timeout in seconds waiting for demuxer to detect pts | float | no | 10.0 |
| poll_interval | Interval in seconds to poll for device reappearance after disconnect | float | no | 1.0 |

### Device Auto-Detection

The `device` parameter supports glob patterns for automatic device discovery:

```yaml
# Auto-detect any NVIDIA Tegra On-Platform Operator device (default)
device: "/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01"

# Specific serial number
device: "/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_ABC123-if01"

# Direct device path (no glob)
device: "/dev/ttyUSB0"
```

### Auto-Recovery

When the target device restarts (e.g., power cycle), the serial device disappears and the demuxer exits. The driver automatically:

1. Detects the device disconnection
2. Polls for the device to reappear
3. Restarts the demuxer with the new device
4. Discovers the new pts path (which changes on each restart)

Active connections will receive errors when the device disconnects. Clients should reconnect, and the driver will wait for the device to be available again.

### Configuration Validation / Limitations

When using multiple driver instances, all instances must have compatible configurations:

- **demuxer_path**: Must be identical across all instances
- **device**: Must be identical across all instances
- **chip**: Must be identical across all instances
- **target**: Must be unique for each instance (no duplicates allowed)

If these requirements are not met, the driver will raise a `ValueError` during initialization.



## CLI Commands

The pyserial driver provides two CLI commands for interacting with serial ports:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import threading
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Optional

from anyio import sleep
from anyio._backends._asyncio import StreamReaderWrapper, StreamWriterWrapper
from serial_asyncio import open_serial_connection

from ..driver import AsyncSerial
from .manager import DemuxerManager
from jumpstarter.driver import Driver, exportstream
Comment thread
mangelajo marked this conversation as resolved.

# Default glob pattern for NVIDIA Tegra On-Platform Operator devices
NV_DEVICE_PATTERN = "/dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01"


@dataclass(kw_only=True)
class NVDemuxSerial(Driver):
"""Serial driver for NVIDIA TCU demultiplexed UART channels.

This driver wraps the nv_tcu_demuxer tool to extract a specific demultiplexed
UART channel (like CCPLEX) from a multiplexed serial device. Multiple driver
instances can share the same demuxer process by specifying different targets.

Args:
demuxer_path: Path to the nv_tcu_demuxer binary
device: Device path or glob pattern for auto-detection.
Default: /dev/serial/by-id/usb-NVIDIA_Tegra_On-Platform_Operator_*-if01
target: Target channel to extract (e.g., "CCPLEX: 0", "BPMP: 1")
chip: Chip type for demuxer (T234 for Orin, T264 for Thor)
baudrate: Baud rate for the serial connection
cps: Characters per second throttling (optional)
timeout: Timeout waiting for demuxer to detect pts
poll_interval: Interval to poll for device reappearance after disconnect

Note:
Multiple instances can be created with different targets. All instances
must use the same demuxer_path, device, and chip configuration.
"""

demuxer_path: str
device: str = field(default=NV_DEVICE_PATTERN)
target: str = field(default="CCPLEX: 0")
chip: str = field(default="T264")
baudrate: int = field(default=115200)
cps: Optional[float] = field(default=None)
timeout: float = field(default=10.0)
poll_interval: float = field(default=1.0)

# Internal state (not init params)
_ready: threading.Event = field(init=False, default_factory=threading.Event)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

_registered: bool = field(init=False, default=False)

def __post_init__(self):
if hasattr(super(), "__post_init__"):
super().__post_init__()

# Register with the DemuxerManager
manager = DemuxerManager.get_instance()
try:
manager.register_driver(
driver_id=str(self.uuid),
demuxer_path=self.demuxer_path,
device=self.device,
chip=self.chip,
target=self.target,
callback=self._on_target_ready,
poll_interval=self.poll_interval,
)
self._registered = True
except ValueError as e:
self.logger.error("Failed to register with DemuxerManager: %s", e)
raise


@classmethod
def client(cls) -> str:
return "jumpstarter_driver_pyserial.client.PySerialClient"

def _on_target_ready(self, target: str, pts_path: str):
"""Callback invoked by DemuxerManager when target becomes ready.

Args:
target: The target channel that became ready
pts_path: The pts path for this target
"""
self.logger.info("Target '%s' ready with pts path: %s", target, pts_path)
self._ready.set()

def close(self):
"""Unregister from the DemuxerManager."""
if self._registered:
manager = DemuxerManager.get_instance()
manager.unregister_driver(str(self.uuid))
self._registered = False

super().close()

@exportstream
@asynccontextmanager
async def connect(self):
"""Connect to the demultiplexed serial port.

Waits for the demuxer to be ready (device connected and pts path discovered)
before opening the serial connection.
"""
# Wait for ready state
start_time = time.monotonic()
while not self._ready.is_set():
elapsed = time.monotonic() - start_time
if elapsed >= self.timeout:
raise TimeoutError(
f"Timeout waiting for demuxer to become ready (device pattern: {self.device})"
)
# Use a short sleep to allow checking ready state
await sleep(0.1)

# Get the current pts path from manager (retry until timeout)
manager = DemuxerManager.get_instance()
pts_start = time.monotonic()
pts_path = manager.get_pts_path(str(self.uuid))
while not pts_path:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come ready is already set but pts is still not available?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hehe, that was not obvious for Claude either, I had to add this.

Because the manager can be ready (it stays up, ready to launch, start, restart the muxer), but the PTS could not be ready (the muxer just went offline, so no ports...) :)

This became important when I started powering off... trying to connect
ir power cycling and trying to connect too quickly, now it just wait until the muxer is up again and the PTS is ready :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You were actually right, I was confused by previous implementation, the ready event, and this was redundant, in the end I removed the callback and just kept the polling mechanism of this loop.

elapsed = time.monotonic() - pts_start
if elapsed >= self.timeout:
raise TimeoutError("Demuxer ready but no pts path available after retrying")
await sleep(self.poll_interval)
pts_path = manager.get_pts_path(str(self.uuid))

cps_info = f", cps: {self.cps}" if self.cps is not None else ""
self.logger.info("Connecting to %s, baudrate: %d%s", pts_path, self.baudrate, cps_info)

reader, writer = await open_serial_connection(url=pts_path, baudrate=self.baudrate, limit=1)
writer.transport.set_write_buffer_limits(high=4096, low=0)
async with AsyncSerial(
reader=StreamReaderWrapper(reader),
writer=StreamWriterWrapper(writer),
cps=self.cps,
) as stream:
yield stream
self.logger.info("Disconnected from %s", pts_path)
Loading