feat: add remote l3 python session runtime#1011
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughAdds a complete Remote L3 orchestration stack to the ChangesRemote L3 Worker/Session Infrastructure
Sequence Diagram(s)sequenceDiagram
rect rgba(70, 130, 180, 0.5)
Note over ClientCode,Worker: Registration
ClientCode->>Worker: add_remote_worker(RemoteWorkerSpec)
ClientCode->>Worker: register(RemoteCallable, workers=[endpoint_id])
Worker->>RemoteL3WorkerDaemon: TCP JSON manifest
RemoteL3WorkerDaemon->>RemoteL3Session: spawn subprocess --manifest --ready-fd
RemoteL3Session-->>RemoteL3WorkerDaemon: ready JSON (cmd_port, health_port)
RemoteL3WorkerDaemon-->>Worker: session metadata + PID
Worker->>RemoteL3Session: CONTROL prepare (callable descriptor + digest)
Worker->>RemoteL3Session: CONTROL commit
RemoteL3Session-->>Worker: ACK
end
rect rgba(100, 160, 100, 0.5)
Note over ClientCode,RemoteL3Session: Task Submission
ClientCode->>Worker: submit_next_level(handle, TaskArgs with RemoteTensorRef)
Worker->>Orchestrator: submit_next_level(handle, args)
Orchestrator->>Orchestrator: _split_next_level_args → remote_sidecar
Orchestrator->>Orchestrator: _remote_data_eligible_endpoint_ids → final_endpoint_ids
Orchestrator->>CppRuntime: submit_next_level(args, final_endpoint_ids, remote_sidecar)
CppRuntime->>RemoteL3Session: TASK frame (tensor sidecar descriptors)
RemoteL3Session->>RemoteL3Session: _materialize_task_args (HOST_INLINE / shm buffer)
RemoteL3Session->>InnerWorker: run(callable, materialized_args)
InnerWorker-->>RemoteL3Session: result
RemoteL3Session-->>CppRuntime: completion frame
CppRuntime-->>Orchestrator: task complete
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces remote L3 execution capabilities, adding a remote control daemon (remote_l3_worker.py), a session runner (remote_l3_session.py), and remote memory management APIs to Worker and TaskArgs. Key feedback includes addressing a zombie process leak in the daemon when spawning subprocesses, replacing id(self) dictionary keys with a WeakKeyDictionary in task_interface.py to prevent memory leaks and correctness bugs, closing leaked connection sockets in the health loop, and setting a timeout on command_sock.accept() to avoid indefinite hangs.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def serve(host: str, port: int) -> int: | ||
| server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
There was a problem hiding this comment.
The daemon process spawns session runner subprocesses via subprocess.Popen in _start_session, but it never reaps them when they exit. Because the daemon is long-running, this will accumulate zombie processes (defunct processes) over time, eventually exhausting the system's PID space.
To prevent this zombie process leak on POSIX systems, configure the daemon to automatically reap child processes by ignoring SIGCHLD at the start of serve.
def serve(host: str, port: int) -> int:
import signal
try:
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
except AttributeError:
pass
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)| _REMOTE_TASK_ARGS_STORAGE: dict[int, _RemoteTaskArgsStorage] = {} | ||
| _REMOTE_TASK_ARGS_STORAGE_LOCK = threading.Lock() |
There was a problem hiding this comment.
Using id(self) as a key in a standard global dictionary (_REMOTE_TASK_ARGS_STORAGE) introduces both a memory leak and a correctness bug:
- Memory Leak: The
_RemoteTaskArgsStorageentries are never removed whenTaskArgsobjects are garbage collected (unless explicitly cleared), leading to a persistent memory leak. - Correctness Bug: Python reuses
id()values for newly allocated objects once the old object is garbage collected. If a newTaskArgsobject is allocated with the sameidas a recently destroyed one, it will silently inherit the stale remote sidecars of the old object, leading to incorrect task arguments and undefined behavior.
To resolve this, use weakref.WeakKeyDictionary to automatically clean up entries when TaskArgs is garbage collected, and use the TaskArgs object itself as the key instead of its id(). Ensure you update other functions accessing this dictionary (such as _storage_for_remote_task_args, _task_args_add_tensor, _task_args_clear, and _remote_sidecar_for) to use self or args directly as the key.
| _REMOTE_TASK_ARGS_STORAGE: dict[int, _RemoteTaskArgsStorage] = {} | |
| _REMOTE_TASK_ARGS_STORAGE_LOCK = threading.Lock() | |
| import weakref | |
| _REMOTE_TASK_ARGS_STORAGE = weakref.WeakKeyDictionary() | |
| _REMOTE_TASK_ARGS_STORAGE_LOCK = threading.Lock() |
| finally: | ||
| try: | ||
| sock.close() | ||
| except OSError: | ||
| pass |
There was a problem hiding this comment.
If the health loop exits (e.g., when stop.is_set() becomes true), the active connection socket conn is leaked if it is currently open. Only the listening socket sock is closed in the finally block.
To prevent leaking the connection socket, ensure conn is also closed in the finally block if it is not None.
| finally: | |
| try: | |
| sock.close() | |
| except OSError: | |
| pass | |
| finally: | |
| if conn is not None: | |
| try: | |
| conn.close() | |
| except OSError: | |
| pass | |
| try: | |
| sock.close() | |
| except OSError: | |
| pass |
References
- When a function call can raise an exception, and there is cleanup logic that must run afterward (e.g., releasing resources), wrap the call in a
try...finallyblock to ensure the cleanup logic is always executed.
|
|
||
| conn, _addr = command_sock.accept() | ||
| with conn: | ||
| _run_command_loop(conn, manifest, inner_worker, manifest_inner_handles) |
There was a problem hiding this comment.
command_sock.accept() is called without any timeout configured. If the parent process crashes, fails to connect, or disconnects before establishing the command connection, the session runner process will hang indefinitely on this call.
To prevent indefinite hangs, set a timeout on command_sock (e.g., using the remote_session_timeout_s value from the manifest, defaulting to 30 seconds) before calling accept().
timeout_s = float(manifest.get("remote_session_timeout_s", 30.0))
command_sock.settimeout(timeout_s)
conn, _addr = command_sock.accept()
conn.settimeout(None)
with conn:
_run_command_loop(conn, manifest, inner_worker, manifest_inner_handles)
return 0PR 1 of the remote L3 worker split stack (extracted from #866). Adds the design-contract documents and pre-split audit artifacts, and serves as the overview / source of truth for the implementation PRs that follow. Lands docs only — no runtime behavior; later PRs must be reviewed against this contract (or the contract updated). Split stack: - #1006 docs: remote L3 worker contract (this PR — umbrella/overview) - #1007 feat: remote import callable descriptors - #1008 feat: remote L3 wire codec - #1009 feat: remote endpoint eligibility scheduling - #1010 feat: remote L3 endpoint facade - #1011 feat: remote L3 Python session runtime Documents: - Remote L3 worker model and host/session/worker responsibilities. - Protocol surface: frames, TASK/COMPLETION/CONTROL/HEALTH, reserved fields. - Buffer and transport ownership, import, release, and future transport work. - Scheduler, orchestrator, worker-manager, task-flow, and hierarchical runtime doc updates so remote endpoints fit the existing runtime model. - Split-and-audit plan plus audit artifacts: compliance matrix, split map, risk register, verification checklist, and future-work list. Contract boundaries established for the stack: - Separates required behavior from explicitly unsupported behavior; reserved and unsupported protocol paths must fail explicitly. - Records the review boundaries for callable identity, wire codec, scheduler eligibility, C++ remote endpoint, and Python session runtime. Future work (out of this split unless a later dedicated PR changes the contract): A2 RoCE / A3 HCCS / A5 UB HCOMM hardware profiles, and Remote CommDomain support.
PR 3 of the remote L3 worker split stack. Adds the versioned cross-host frame protocol (codec only) that #1010 (C++ endpoint) and #1011 (Python session) build on. Transport-neutral and self-contained: encode/decode + struct definitions + tests; no sockets, endpoint, or session loop yet. Adds: - C++ `remote_wire.{h,cpp}`: SLR3 FrameHeader + payload codecs for HELLO, TASK, COMPLETION, CONTROL, CONTROL_REPLY, and remote buffer export/import/release. Canonical little-endian field encoding — no raw POD memcpy of CallConfig/ContinuousTensor onto the wire. - Python `remote_l3_protocol.py`: byte-symmetric codec plus thin read_frame/send_frame socket helpers. - `types.h`: wire-serialized Remote* structs (RemoteAddressSpace, RemoteBufferHandle/Export, RemoteTensorDesc/Ref/Sidecar, RemoteTaskArgsSidecar).
PR 5 of the remote L3 worker split stack. Adds the parent-side network leg that plugs into #1009's endpoint scheduling: a second WorkerEndpoint implementation that drives a remote L3 child over a socket, alongside the existing LocalMailboxEndpoint. Transport-agnostic and exercised against an in-memory fake; no live remote daemon yet (the session runner / callee lands in #1011). Adds: - RemoteL3Transport: pure-virtual transport seam (submit_frame / wait_for_reply / shutdown) so the endpoint is independent of how bytes cross the wire (sim TCP today, HCOMM later, fake in tests). - RemoteL3SocketTransport: TCP transport — command lane plus an independent health lane, HELLO-READY handshake before the endpoint is schedulable, non-blocking connect and poll-based I/O honoring the configured timeout, background health monitor that fails in-flight commands on link loss. - RemoteL3Endpoint: encodes a dispatched task into an SLR3 TASK frame, blocks for COMPLETION, and maps it to WorkerCompletion (remote error -> TASK_FAILURE, transport error -> ENDPOINT_FAILURE — matching #1009's poison semantics); all control verbs (register / malloc / copy / export / import) ride CONTROL/CONTROL_REPLY over a single-flight OrderedCommandLane. Rejects bare host pointers / missing sidecars before sending. - Worker::add_remote_l3_socket factory (registers the endpoint before init, validated under WorkerManager's endpoint-id uniqueness check) and the Python bindings for the remote_* control surface. - C++ unit tests via a FakeRemoteTransport (no real socket).
2198ded to
52cbbf3
Compare
There was a problem hiding this comment.
Actionable comments posted: 11
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
python/simpler/orchestrator.py (1)
261-284:⚠️ Potential issue | 🟠 Major | ⚡ Quick winReject
RemoteTensorRefsidecars on SUB submits.
TaskArgs.add_tensor(RemoteTensorRef, ...)stores a sidecar and inserts a placeholderContinuousTensorwith pointer0. NEXT_LEVEL local submits reject that, butsubmit_sub()andsubmit_sub_group()forward the placeholder without sidecar materialization, so Python sub workers can receive invalid tensor metadata.🐛 Proposed fix
def submit_sub(self, callable_handle: Any, args: TaskArgs | None = None): @@ if args is None: args = TaskArgs() + if _remote_sidecar_for(args) is not None: + raise TypeError("RemoteTensorRef is only supported for RemoteCallable NEXT_LEVEL submits") digest, kind, target_namespace, _eligible_endpoint_ids = _require_handle( callable_handle, @@ def submit_sub_group(self, callable_handle: Any, args_list: list): """Submit a group of SUB tasks (N TaskArgs → N workers, 1 DAG node).""" + for args in args_list: + if not isinstance(args, TaskArgs): + raise TypeError("SUB group submit expects TaskArgs") + if _remote_sidecar_for(args) is not None: + raise TypeError("RemoteTensorRef is only supported for RemoteCallable NEXT_LEVEL submits") digest, kind, target_namespace, _eligible_endpoint_ids = _require_handle( callable_handle,🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@python/simpler/orchestrator.py` around lines 261 - 284, The submit_sub() and submit_sub_group() methods are forwarding TaskArgs to sub workers without validating or rejecting RemoteTensorRef sidecars. Add validation logic in both methods to inspect the TaskArgs (or args_list for submit_sub_group) and reject submissions that contain RemoteTensorRef sidecars before calling self._o.submit_sub() or self._o.submit_sub_group(). This validation should prevent sub workers from receiving invalid tensor metadata with placeholder pointers.
🧹 Nitpick comments (5)
python/simpler/remote_l3_worker.py (2)
124-139: 💤 Low valueUnreachable return value in
servefunction.The function signature declares
-> intbut thewhile Trueloop never terminates normally, so no value is ever returned. This is fine for a daemon entry point, but consider either removing the return type annotation or adding a return statement after the loop for clarity if there's ever a clean shutdown path.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@python/simpler/remote_l3_worker.py` around lines 124 - 139, The serve function is declared with a return type annotation of -> int but contains a while True loop that never terminates normally, making the return type unreachable and misleading. Remove the return type annotation from the function signature since this is a daemon entry point that runs indefinitely and does not return a value under normal circumstances.
63-74: 💤 Low valueConsider adding a timeout to prevent indefinite blocking.
If the session runner subprocess stalls without crashing or sending a ready payload,
os.read(fd, 1)will block indefinitely. Consider usingselect.selectwith a timeout or setting the fd to non-blocking mode with a poll loop.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@python/simpler/remote_l3_worker.py` around lines 63 - 74, The _read_runner_ready function uses os.read in an infinite loop that blocks indefinitely if the session runner subprocess stalls. Replace the current blocking read pattern with select.select that includes a timeout parameter, or alternatively set the file descriptor to non-blocking mode and implement a poll loop with timeout handling. This ensures the function will raise an appropriate exception (such as a timeout error) rather than hanging indefinitely when the subprocess fails to send the ready payload within a reasonable time window.python/simpler/remote_l3_session.py (3)
837-842: 💤 Low valueGlobal handle registry cleared unconditionally on command loop exit.
The finally block clears
_INNER_HANDLESat module level. This is correct for the current single-session-per-process design, but the global clear could be surprising if the design ever changes. Consider adding a comment noting this is intentional for the single-session model.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@python/simpler/remote_l3_session.py` around lines 837 - 842, The finally block unconditionally clears the module-level _INNER_HANDLES registry without any explanation of this behavior. Add a comment above the _INNER_HANDLES.clear() call documenting that this global clear is intentional for the current single-session-per-process design, noting that this behavior could be surprising if the design ever changes to support multiple sessions. This will help future maintainers understand the architectural intent.
354-361: 💤 Low valueSuppressed exceptions during rollback are intentional but could benefit from logging.
The
try-except-passpattern during rollback cleanup is correct to ensure all handles are unregistered even if individual unregister calls fail. However, silently swallowing these exceptions could hide issues during debugging. Consider logging at debug level.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@python/simpler/remote_l3_session.py` around lines 354 - 361, The inner try-except block that catches BaseException during cleanup of inner_worker.unregister(handle) is silently swallowing exceptions with just a pass statement. Replace the pass statement with a debug-level log message that captures and logs the exception details, ensuring that cleanup continues for all handles even when individual unregister calls fail, but without losing visibility into errors that occur during the rollback process.
403-403: ⚡ Quick winAdd
strict=Truetozip()for defensive validation.Although line 398-399 validates that lengths match, adding
strict=Trueprovides defense-in-depth and documents the intent that the iteration must cover all elements.♻️ Proposed fix
- for tensor, sidecar in zip(args.tensor_metadata, args.remote_desc): + for tensor, sidecar in zip(args.tensor_metadata, args.remote_desc, strict=True):🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@python/simpler/remote_l3_session.py` at line 403, Add the `strict=True` parameter to the `zip()` function call that iterates over `args.tensor_metadata` and `args.remote_desc`. This parameter will raise a ValueError if the two iterables have different lengths, providing defensive validation and documenting the intent that the iteration must cover all elements equally, complementing the length validation checks that already exist in the code.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@python/simpler/task_interface.py`:
- Around line 439-529: The current implementation uses id(args)-keyed storage in
the global _REMOTE_TASK_ARGS_STORAGE dictionary, which causes memory leaks when
TaskArgs.clear() is not called and allows object ID reuse to attach stale
sidecars to unrelated objects. Replace the process-wide dictionary keyed by
id(args) with a WeakKeyDictionary from the weakref module, or implement a
finalizer using weakref.finalize() that automatically cleans up the storage when
each TaskArgs object is garbage collected. Update _storage_for_remote_task_args,
_task_args_add_tensor, _task_args_clear, and _remote_sidecar_for to use this new
mechanism so storage is tied to the actual TaskArgs object lifetime rather than
relying on explicit clear() calls.
- Around line 418-430: The host_inline method does not validate that the
provided payload size matches the expected size derived from the shape and dtype
parameters. To fix this, calculate the expected number of bytes by multiplying
all dimensions in the shape tuple and then multiplying by the size of the dtype,
then validate that len(data) matches this expected byte count. If they do not
match, raise a ValueError with a descriptive message before creating the
RemoteBufferHandle. This validation should be added at the beginning of the
host_inline method to ensure consistent task metadata.
In `@python/simpler/worker.py`:
- Around line 2019-2056: Remote control function calls like
remote_prepare_register and remote_commit_register can raise exceptions that
bypass the current error handling logic which only checks result.ok. Wrap each
remote control call in a try-except block to catch any raised exceptions, append
the error details to the errors list in the same format as non-OK results, and
ensure that the cleanup branches using _remote_abort_prepared and
_remote_unregister_committed execute regardless of whether the error came from
result.ok being False or from an exception being raised. This ensures
uncertain_hashids are marked appropriately and cleanup occurs consistently
across all failure paths.
- Around line 1681-1727: The remote_import method currently acquires the owner
import reference after calling self._worker.remote_import(), which creates the
remote mapping. If the owner handle was released between remote_export() and
remote_import(), the _acquire_import_ref() call will raise an error after the
remote import already exists, leaving an orphaned import. Move the
owner_handle._acquire_import_ref() call to execute before the
self._worker.remote_import() call to validate and acquire the reference early,
and ensure that if the remote import call fails, the acquired reference is
properly rolled back to maintain consistency.
- Around line 2744-2765: The remote session initialization loop starting with
the enumeration of self._remote_worker_specs opens and registers sessions before
_initialized is set to true. If an exception occurs during this process after
some sessions have been opened, those partial sessions remain in
self._remote_sessions and self._worker without being cleaned up since close()
returns early when _initialized is false. Wrap the entire remote session opening
and registration loop in a try-except block that catches any exceptions, and in
the except handler, close the self._worker, clear self._remote_sessions, and any
remote sockets that were added, then re-raise the exception to ensure proper
cleanup of partially initialized state before allowing the exception to
propagate.
- Around line 1776-1798: The method `_flush_pending_remote_frees()` clears the
pending lists at the beginning before processing, so if
`_send_remote_release_import()` or `_send_remote_free()` raise an exception,
unprocessed handles will be lost. Wrap the calls to
`_send_remote_release_import()` and `_send_remote_free()` in try-except blocks
to catch exceptions, then append any failed handles back to their respective
pending lists (self._pending_remote_import_releases or
self._pending_remote_buffer_frees) to ensure they are retried on the next flush
cycle instead of being silently dropped.
- Around line 1377-1386: The endpoint_id calculation in add_remote_worker()
depends on the current count of local workers in self._next_level_workers, which
can change if local workers are added later, causing previously assigned remote
IDs to become invalid. Add a guard check at the beginning of the
add_remote_worker() method to ensure no local workers have been added yet by
checking if self._next_level_workers is empty, and raise a TypeError if local
workers already exist, ensuring remote endpoint IDs remain stable and
independent of subsequent local worker additions.
- Around line 1445-1464: The _build_remote_manifest method currently sets
listen_host to 0.0.0.0 for non-loopback daemons, which exposes the session
command/health ports on all network interfaces. Change the listen_host
assignment logic to default to a specific bind address like 127.0.0.1 instead of
0.0.0.0, and only use 0.0.0.0 if there is an explicit configuration option
(likely in self._config) that explicitly enables wildcard binding. This ensures
remote session ports are not exposed on all interfaces by default.
- Around line 1748-1762: The _capture_remote_sidecar_refs method has an
atomicity issue where if _acquire_slot_ref() raises an exception on a later
handle, the earlier handles that were already acquired remain in the captured
list without being released, causing a resource leak. To fix this, wrap the
handle acquisition logic in a try-except block that catches any exception from
_acquire_slot_ref(), and if an exception occurs, iterate through all previously
captured handles and call their release method (or equivalent cleanup) before
re-raising the exception to ensure atomicity.
In `@tests/ut/py/test_callable_identity.py`:
- Line 1117: In the pytest.raises call with the match parameter containing the
pattern "RemoteL3Endpoint::run|socket closed|health lane", convert the match
string to a raw string by prefixing it with the `r` character before the opening
quote. This ensures that the pipe characters used as regex alternation
metacharacters are properly interpreted by the regex engine.
In `@tests/ut/py/test_task_interface.py`:
- Line 507: The regex pattern "Worker.remote_malloc" used in the pytest.raises
match parameter contains a period which is a regex metacharacter that matches
any character instead of a literal period. Fix this issue by converting both
occurrences (at lines 507 and 517 in the pytest.raises calls) to use raw strings
by prefixing with the r prefix so the period is treated as a literal character
match rather than a regex wildcard.
---
Outside diff comments:
In `@python/simpler/orchestrator.py`:
- Around line 261-284: The submit_sub() and submit_sub_group() methods are
forwarding TaskArgs to sub workers without validating or rejecting
RemoteTensorRef sidecars. Add validation logic in both methods to inspect the
TaskArgs (or args_list for submit_sub_group) and reject submissions that contain
RemoteTensorRef sidecars before calling self._o.submit_sub() or
self._o.submit_sub_group(). This validation should prevent sub workers from
receiving invalid tensor metadata with placeholder pointers.
---
Nitpick comments:
In `@python/simpler/remote_l3_session.py`:
- Around line 837-842: The finally block unconditionally clears the module-level
_INNER_HANDLES registry without any explanation of this behavior. Add a comment
above the _INNER_HANDLES.clear() call documenting that this global clear is
intentional for the current single-session-per-process design, noting that this
behavior could be surprising if the design ever changes to support multiple
sessions. This will help future maintainers understand the architectural intent.
- Around line 354-361: The inner try-except block that catches BaseException
during cleanup of inner_worker.unregister(handle) is silently swallowing
exceptions with just a pass statement. Replace the pass statement with a
debug-level log message that captures and logs the exception details, ensuring
that cleanup continues for all handles even when individual unregister calls
fail, but without losing visibility into errors that occur during the rollback
process.
- Line 403: Add the `strict=True` parameter to the `zip()` function call that
iterates over `args.tensor_metadata` and `args.remote_desc`. This parameter will
raise a ValueError if the two iterables have different lengths, providing
defensive validation and documenting the intent that the iteration must cover
all elements equally, complementing the length validation checks that already
exist in the code.
In `@python/simpler/remote_l3_worker.py`:
- Around line 124-139: The serve function is declared with a return type
annotation of -> int but contains a while True loop that never terminates
normally, making the return type unreachable and misleading. Remove the return
type annotation from the function signature since this is a daemon entry point
that runs indefinitely and does not return a value under normal circumstances.
- Around line 63-74: The _read_runner_ready function uses os.read in an infinite
loop that blocks indefinitely if the session runner subprocess stalls. Replace
the current blocking read pattern with select.select that includes a timeout
parameter, or alternatively set the file descriptor to non-blocking mode and
implement a poll loop with timeout handling. This ensures the function will
raise an appropriate exception (such as a timeout error) rather than hanging
indefinitely when the subprocess fails to send the ready payload within a
reasonable time window.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6a7e2fde-c6af-4835-8558-6104c1f59b00
📒 Files selected for processing (9)
pyproject.tomlpython/simpler/callable_identity.pypython/simpler/orchestrator.pypython/simpler/remote_l3_session.pypython/simpler/remote_l3_worker.pypython/simpler/task_interface.pypython/simpler/worker.pytests/ut/py/test_callable_identity.pytests/ut/py/test_task_interface.py
| @classmethod | ||
| def host_inline(cls, payload: bytes, *, shape: tuple[int, ...], dtype: DataType) -> RemoteTensorRef: | ||
| data = bytes(payload) | ||
| handle = RemoteBufferHandle( | ||
| endpoint_id=0, | ||
| owner_endpoint_id=0, | ||
| buffer_id=0, | ||
| generation=0, | ||
| address_space=RemoteAddressSpace.HOST_INLINE, | ||
| nbytes=len(data), | ||
| _internal_token=_REMOTE_BUFFER_HANDLE_TOKEN, | ||
| ) | ||
| return cls(handle=handle, offset=0, shape=shape, dtype=dtype, nbytes=len(data), inline_payload=data) |
There was a problem hiding this comment.
Validate HOST_INLINE payload size against shape and dtype.
host_inline() currently passes nbytes=len(data), so a 1-byte payload with shape=(4,), dtype=UINT8 succeeds while the placeholder tensor advertises four elements. That creates inconsistent task metadata for remote materialization.
🐛 Proposed fix
def host_inline(cls, payload: bytes, *, shape: tuple[int, ...], dtype: DataType) -> RemoteTensorRef:
data = bytes(payload)
+ normalized_shape = tuple(int(x) for x in shape)
+ expected_nbytes = _remote_tensor_nbytes(normalized_shape, dtype)
+ if len(data) != expected_nbytes:
+ raise ValueError("HOST_INLINE payload length must match shape and dtype byte size")
handle = RemoteBufferHandle(
endpoint_id=0,
owner_endpoint_id=0,
buffer_id=0,
generation=0,
address_space=RemoteAddressSpace.HOST_INLINE,
nbytes=len(data),
_internal_token=_REMOTE_BUFFER_HANDLE_TOKEN,
)
- return cls(handle=handle, offset=0, shape=shape, dtype=dtype, nbytes=len(data), inline_payload=data)
+ return cls(handle=handle, offset=0, shape=normalized_shape, dtype=dtype, nbytes=expected_nbytes, inline_payload=data)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@python/simpler/task_interface.py` around lines 418 - 430, The host_inline
method does not validate that the provided payload size matches the expected
size derived from the shape and dtype parameters. To fix this, calculate the
expected number of bytes by multiplying all dimensions in the shape tuple and
then multiplying by the size of the dtype, then validate that len(data) matches
this expected byte count. If they do not match, raise a ValueError with a
descriptive message before creating the RemoteBufferHandle. This validation
should be added at the beginning of the host_inline method to ensure consistent
task metadata.
| _TASK_ARGS_ADD_TENSOR = TaskArgs.add_tensor | ||
| _TASK_ARGS_CLEAR = TaskArgs.clear | ||
| _REMOTE_TASK_ARGS_STORAGE: dict[int, _RemoteTaskArgsStorage] = {} | ||
| _REMOTE_TASK_ARGS_STORAGE_LOCK = threading.Lock() | ||
|
|
||
|
|
||
| def _sidecar_from_ref(storage: _RemoteTaskArgsStorage, ref: RemoteTensorRef) -> _RemoteTensorSidecar: | ||
| handle = ref.handle | ||
| inline_offset = 0 | ||
| inline_len = 0 | ||
| if handle.address_space == RemoteAddressSpace.HOST_INLINE: | ||
| inline_offset = len(storage.inline_payload) | ||
| inline_len = len(ref.inline_payload) | ||
| storage.inline_payload.extend(ref.inline_payload) | ||
| nbytes = ref.nbytes | ||
| assert nbytes is not None | ||
|
|
||
| desc = _RemoteTensorDesc( | ||
| address_space=handle.address_space, | ||
| owner_endpoint_id=0 if handle.address_space == RemoteAddressSpace.HOST_INLINE else handle.owner_endpoint_id, | ||
| buffer_id=0 if handle.address_space == RemoteAddressSpace.HOST_INLINE else handle._buffer_id, | ||
| offset=0 if handle.address_space == RemoteAddressSpace.HOST_INLINE else handle._offset + ref.offset, | ||
| nbytes=int(nbytes), | ||
| remote_addr=0 if handle.address_space == RemoteAddressSpace.HOST_INLINE else handle._remote_addr, | ||
| rkey_or_token=0 if handle.address_space == RemoteAddressSpace.HOST_INLINE else handle._rkey_or_token, | ||
| generation=0 if handle.address_space == RemoteAddressSpace.HOST_INLINE else handle._generation, | ||
| inline_payload_offset=inline_offset, | ||
| inline_payload_len=inline_len, | ||
| flags=0, | ||
| ) | ||
| handle_ref = None if handle.address_space == RemoteAddressSpace.HOST_INLINE else handle | ||
| return _RemoteTensorSidecar(True, desc, handle_ref) | ||
|
|
||
|
|
||
| def _storage_for_remote_task_args(args: TaskArgs) -> _RemoteTaskArgsStorage: | ||
| key = id(args) | ||
| with _REMOTE_TASK_ARGS_STORAGE_LOCK: | ||
| storage = _REMOTE_TASK_ARGS_STORAGE.get(key) | ||
| if storage is None or len(storage.sidecars) != args.tensor_count(): | ||
| storage = _RemoteTaskArgsStorage([None for _ in range(args.tensor_count())], bytearray()) | ||
| _REMOTE_TASK_ARGS_STORAGE[key] = storage | ||
| return storage | ||
|
|
||
|
|
||
| def _task_args_add_tensor( | ||
| self: TaskArgs, tensor: ContinuousTensor | RemoteTensorRef, tag: TensorArgType = TensorArgType.INPUT | ||
| ) -> None: | ||
| if isinstance(tensor, RemoteTensorRef): | ||
| storage = _storage_for_remote_task_args(self) | ||
| metadata = ContinuousTensor.make(0, tensor.shape, tensor.dtype) | ||
| _TASK_ARGS_ADD_TENSOR(self, metadata, tag) | ||
| storage.sidecars.append(_sidecar_from_ref(storage, tensor)) | ||
| return | ||
| if not isinstance(tensor, ContinuousTensor): | ||
| raise TypeError("TaskArgs.add_tensor expects ContinuousTensor or RemoteTensorRef") | ||
| _TASK_ARGS_ADD_TENSOR(self, tensor, tag) | ||
| key = id(self) | ||
| with _REMOTE_TASK_ARGS_STORAGE_LOCK: | ||
| storage = _REMOTE_TASK_ARGS_STORAGE.get(key) | ||
| if storage is not None: | ||
| storage.sidecars.append(None) | ||
|
|
||
|
|
||
| def _task_args_clear(self: TaskArgs) -> None: | ||
| _TASK_ARGS_CLEAR(self) | ||
| with _REMOTE_TASK_ARGS_STORAGE_LOCK: | ||
| _REMOTE_TASK_ARGS_STORAGE.pop(id(self), None) | ||
|
|
||
|
|
||
| TaskArgs.add_tensor = _task_args_add_tensor | ||
| TaskArgs.clear = _task_args_clear | ||
|
|
||
|
|
||
| def _remote_tensor_nbytes(shape: tuple[int, ...], dtype: DataType) -> int: | ||
| element_count = int(prod(shape)) if shape else 1 | ||
| return element_count * int(get_element_size(dtype)) | ||
|
|
||
|
|
||
| def _empty_remote_sidecar_for(args: TaskArgs) -> _RemoteTaskArgsSidecar: | ||
| return _RemoteTaskArgsSidecar(tuple(None for _ in range(args.tensor_count())), b"") | ||
|
|
||
|
|
||
| def _remote_sidecar_for(args: TaskArgs) -> _RemoteTaskArgsSidecar | None: | ||
| with _REMOTE_TASK_ARGS_STORAGE_LOCK: | ||
| storage = _REMOTE_TASK_ARGS_STORAGE.get(id(args)) | ||
| if storage is None: | ||
| return None | ||
| if len(storage.sidecars) != args.tensor_count(): | ||
| _REMOTE_TASK_ARGS_STORAGE.pop(id(args), None) | ||
| return None | ||
| return _RemoteTaskArgsSidecar(tuple(storage.sidecars), bytes(storage.inline_payload)) |
There was a problem hiding this comment.
Avoid id(args)-keyed sidecar storage leaking across TaskArgs lifetimes.
_REMOTE_TASK_ARGS_STORAGE is only cleared by TaskArgs.clear(), so ordinary short-lived TaskArgs objects with remote tensors leave sidecars and inline payloads in this process-wide dict. Since the key is just id(args), object-id reuse can also attach stale sidecars to a later unrelated TaskArgs. Tie the storage to the object lifetime instead, e.g. with a weak-key map/finalizer or native TaskArgs-owned sidecar storage.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@python/simpler/task_interface.py` around lines 439 - 529, The current
implementation uses id(args)-keyed storage in the global
_REMOTE_TASK_ARGS_STORAGE dictionary, which causes memory leaks when
TaskArgs.clear() is not called and allows object ID reuse to attach stale
sidecars to unrelated objects. Replace the process-wide dictionary keyed by
id(args) with a WeakKeyDictionary from the weakref module, or implement a
finalizer using weakref.finalize() that automatically cleans up the storage when
each TaskArgs object is garbage collected. Update _storage_for_remote_task_args,
_task_args_add_tensor, _task_args_clear, and _remote_sidecar_for to use this new
mechanism so storage is tied to the actual TaskArgs object lifetime rather than
relying on explicit clear() calls.
| def add_remote_worker(self, spec: RemoteWorkerSpec) -> int: | ||
| if self._initialized: | ||
| raise RuntimeError("Worker.add_remote_worker after init") | ||
| if self.level < 4: | ||
| raise TypeError("Worker.add_remote_worker: remote L3 workers require a level >= 4 parent") | ||
| if not isinstance(spec, RemoteWorkerSpec): | ||
| raise TypeError("Worker.add_remote_worker expects a RemoteWorkerSpec") | ||
| endpoint_id = len(self._next_level_workers) + len(self._remote_worker_specs) | ||
| self._remote_worker_specs.append(spec) | ||
| return endpoint_id |
There was a problem hiding this comment.
Keep remote endpoint IDs stable after add_remote_worker().
add_remote_worker() returns an id based on the current local child count, but _init_hierarchical() recomputes remote ids from the final local child count. If a caller adds a local Worker after a remote worker, previously returned remote ids shift and pre-registered RemoteCallable entries can be omitted from the remote manifest or routed to the wrong NEXT_LEVEL slot.
🐛 Proposed guard
def add_worker(self, worker: Worker) -> None:
"""Add a lower-level Worker as a NEXT_LEVEL child. Must be called before init().
@@
if self._initialized:
raise RuntimeError("Worker.add_worker() must be called before init()")
+ if self._remote_worker_specs:
+ raise RuntimeError("Worker.add_worker() must be called before add_remote_worker() so endpoint ids stay stable")
if worker._initialized:
raise RuntimeError("Child worker must not be initialized before add_worker()")Also applies to: 2637-2652, 2744-2746
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@python/simpler/worker.py` around lines 1377 - 1386, The endpoint_id
calculation in add_remote_worker() depends on the current count of local workers
in self._next_level_workers, which can change if local workers are added later,
causing previously assigned remote IDs to become invalid. Add a guard check at
the beginning of the add_remote_worker() method to ensure no local workers have
been added yet by checking if self._next_level_workers is empty, and raise a
TypeError if local workers already exist, ensuring remote endpoint IDs remain
stable and independent of subsequent local worker additions.
| def _build_remote_manifest(self, *, spec: RemoteWorkerSpec, endpoint_id: int, session_id: int) -> dict[str, Any]: | ||
| daemon_host, _daemon_port = self._parse_remote_endpoint(spec.endpoint) | ||
| loopback = daemon_host in ("127.0.0.1", "localhost", "::1") | ||
| return { | ||
| "session_id": int(session_id), | ||
| "parent_worker_level": int(self.level), | ||
| "remote_worker_level": 3, | ||
| "endpoint_id": int(endpoint_id), | ||
| "platform": spec.platform, | ||
| "runtime": spec.runtime, | ||
| "device_ids": list(spec.device_ids), | ||
| "num_sub_workers": int(spec.num_sub_workers), | ||
| "heap_ring_size": self._config.get("remote_heap_ring_size", None), | ||
| "transport": spec.transport, | ||
| "listen_host": "127.0.0.1" if loopback else "0.0.0.0", | ||
| "connect_host": daemon_host, | ||
| "remote_task_dispatcher": self._remote_dispatcher_entries_for_endpoint(endpoint_id), | ||
| "inner_l3_worker": [], | ||
| "feature_flags": [], | ||
| } |
There was a problem hiding this comment.
Avoid binding remote session command lanes to all interfaces by default.
For non-loopback daemons, the manifest sets listen_host to 0.0.0.0, exposing the session command/health ports on every interface. Default to a specific bind address, and require explicit opt-in for wildcard binding.
🛡️ Proposed fix
- loopback = daemon_host in ("127.0.0.1", "localhost", "::1")
+ loopback = daemon_host in ("127.0.0.1", "localhost", "::1")
+ listen_host = str(
+ self._config.get(
+ "remote_session_listen_host",
+ "127.0.0.1" if loopback else daemon_host,
+ )
+ )
return {
@@
- "listen_host": "127.0.0.1" if loopback else "0.0.0.0",
+ "listen_host": listen_host,🧰 Tools
🪛 Ruff (0.15.17)
[error] 1459-1459: Possible binding to all interfaces
(S104)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@python/simpler/worker.py` around lines 1445 - 1464, The
_build_remote_manifest method currently sets listen_host to 0.0.0.0 for
non-loopback daemons, which exposes the session command/health ports on all
network interfaces. Change the listen_host assignment logic to default to a
specific bind address like 127.0.0.1 instead of 0.0.0.0, and only use 0.0.0.0 if
there is an explicit configuration option (likely in self._config) that
explicitly enables wildcard binding. This ensures remote session ports are not
exposed on all interfaces by default.
Source: Linters/SAST tools
| def remote_import( | ||
| self, exported: RemoteBufferExport, *, worker: int, access: str | int | None = None | ||
| ) -> RemoteBufferHandle: | ||
| if not isinstance(exported, RemoteBufferExport): | ||
| raise TypeError("Worker.remote_import expects a RemoteBufferExport returned by remote_export") | ||
| importer_endpoint_id = int(worker) | ||
| self._require_remote_endpoint_started(importer_endpoint_id) | ||
| flags = exported.access_flags if access is None else self._remote_access_flags(access) | ||
| if flags & ~exported.access_flags: | ||
| raise ValueError("Worker.remote_import requested access is not a subset of export access") | ||
| assert self._worker is not None | ||
| fields = self._worker.remote_import( | ||
| importer_endpoint_id, | ||
| exported.owner_endpoint_id, | ||
| exported.buffer_id, | ||
| exported.generation, | ||
| int(exported.address_space), | ||
| exported.offset, | ||
| exported.nbytes, | ||
| exported.export_id, | ||
| exported.remote_addr, | ||
| exported.rkey_or_token, | ||
| exported.ub_ldst_va, | ||
| exported.access_flags, | ||
| exported.transport_profile, | ||
| exported.transport_descriptor, | ||
| flags, | ||
| ) | ||
| owner_handle = exported._owner_handle | ||
| imported = RemoteBufferHandle._from_imported_mapping( | ||
| endpoint_id=int(fields[0]), | ||
| owner_endpoint_id=int(fields[1]), | ||
| buffer_id=int(fields[2]), | ||
| generation=int(fields[3]), | ||
| import_id=int(fields[4]), | ||
| address_space=RemoteAddressSpace(int(fields[5])), | ||
| nbytes=int(fields[6]), | ||
| offset=int(fields[7]), | ||
| remote_addr=int(fields[8]), | ||
| rkey_or_token=int(fields[9]), | ||
| ub_ldst_va=int(fields[10]), | ||
| access_flags=int(fields[11]), | ||
| owner_handle_ref=owner_handle, | ||
| ) | ||
| if owner_handle is not None: | ||
| owner_handle._acquire_import_ref() | ||
| return imported |
There was a problem hiding this comment.
Acquire the owner import ref before creating the remote import.
remote_import() creates the remote mapping first and only then calls owner_handle._acquire_import_ref(). If the owner handle was released between remote_export() and remote_import(), _acquire_import_ref() raises after the remote import exists, leaving an import that Python never returns or releases. Validate/acquire the owner ref before the C++ import call, and roll it back if the remote import fails.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@python/simpler/worker.py` around lines 1681 - 1727, The remote_import method
currently acquires the owner import reference after calling
self._worker.remote_import(), which creates the remote mapping. If the owner
handle was released between remote_export() and remote_import(), the
_acquire_import_ref() call will raise an error after the remote import already
exists, leaving an orphaned import. Move the owner_handle._acquire_import_ref()
call to execute before the self._worker.remote_import() call to validate and
acquire the reference early, and ensure that if the remote import call fails,
the acquired reference is properly rolled back to maintain consistency.
| def _flush_pending_remote_frees(self) -> None: | ||
| pending_imports = self._pending_remote_import_releases | ||
| self._pending_remote_import_releases = [] | ||
| remaining_imports: list[RemoteBufferHandle] = [] | ||
| for handle in pending_imports: | ||
| if handle._live_slot_refs > 0: | ||
| remaining_imports.append(handle) | ||
| continue | ||
| self._send_remote_release_import(handle) | ||
| if handle._owner_handle_ref is not None: | ||
| handle._owner_handle_ref._release_import_ref() | ||
| handle._owner_handle_ref = None | ||
| self._pending_remote_import_releases.extend(remaining_imports) | ||
|
|
||
| pending = self._pending_remote_buffer_frees | ||
| self._pending_remote_buffer_frees = [] | ||
| remaining: list[RemoteBufferHandle] = [] | ||
| for handle in pending: | ||
| if handle._live_slot_refs > 0 or handle._live_import_refs > 0: | ||
| remaining.append(handle) | ||
| continue | ||
| self._send_remote_free(handle) | ||
| self._pending_remote_buffer_frees.extend(remaining) |
There was a problem hiding this comment.
Preserve pending remote frees/releases when cleanup sends fail.
_flush_pending_remote_frees() clears both pending lists before issuing remote cleanup calls. If one _send_remote_release_import() or _send_remote_free() raises, the remaining handles are dropped from bookkeeping and will not be retried on the next run/close.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@python/simpler/worker.py` around lines 1776 - 1798, The method
`_flush_pending_remote_frees()` clears the pending lists at the beginning before
processing, so if `_send_remote_release_import()` or `_send_remote_free()` raise
an exception, unprocessed handles will be lost. Wrap the calls to
`_send_remote_release_import()` and `_send_remote_free()` in try-except blocks
to catch exceptions, then append any failed handles back to their respective
pending lists (self._pending_remote_import_releases or
self._pending_remote_buffer_frees) to ensure they are retried on the next flush
cycle instead of being silently dropped.
| for endpoint_id in reg.eligible_endpoint_ids: | ||
| result = self._worker.remote_prepare_register( | ||
| endpoint_id, | ||
| "REMOTE_TASK_DISPATCHER", | ||
| reg.kind, | ||
| payload, | ||
| reg.digest, | ||
| ) | ||
| if result.ok: | ||
| prepared.append(endpoint_id) | ||
| else: | ||
| errors.append(f"{result.worker_type}[{result.worker_index}]: {result.error_message}") | ||
| break | ||
| if errors: | ||
| cleanup_errors = self._remote_abort_prepared(prepared, reg) | ||
| if cleanup_errors: | ||
| with self._registry_lock: | ||
| self._uncertain_hashids.add(reg.digest) | ||
| raise RuntimeError(self._format_register_partial_failure(reg.digest, errors, cleanup_errors)) | ||
|
|
||
| committed: list[int] = [] | ||
| for endpoint_id in reg.eligible_endpoint_ids: | ||
| result = self._worker.remote_commit_register( | ||
| endpoint_id, | ||
| "REMOTE_TASK_DISPATCHER", | ||
| reg.kind, | ||
| reg.digest, | ||
| ) | ||
| if result.ok: | ||
| committed.append(endpoint_id) | ||
| else: | ||
| errors.append(f"{result.worker_type}[{result.worker_index}]: {result.error_message}") | ||
| break | ||
| if errors: | ||
| cleanup_errors = self._remote_abort_prepared( | ||
| [endpoint_id for endpoint_id in prepared if endpoint_id not in committed], reg | ||
| ) | ||
| cleanup_errors.extend(self._remote_unregister_committed(committed, reg)) |
There was a problem hiding this comment.
Route thrown remote-control errors through the partial-failure cleanup path.
These loops only handle result.ok == False. If remote_prepare_register, remote_commit_register, remote_abort_register, remote_unregister, or the unregister path raises after earlier endpoints succeeded, the cleanup branches are skipped or local state is removed without marking the digest uncertain. Wrap each remote control call, record the error, and run the same abort/unregister cleanup used for non-OK results.
Also applies to: 2073-2101, 2522-2534
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@python/simpler/worker.py` around lines 2019 - 2056, Remote control function
calls like remote_prepare_register and remote_commit_register can raise
exceptions that bypass the current error handling logic which only checks
result.ok. Wrap each remote control call in a try-except block to catch any
raised exceptions, append the error details to the errors list in the same
format as non-OK results, and ensure that the cleanup branches using
_remote_abort_prepared and _remote_unregister_committed execute regardless of
whether the error came from result.ok being False or from an exception being
raised. This ensures uncertain_hashids are marked appropriately and cleanup
occurs consistently across all failure paths.
| local_next_level_count = len(self._next_level_workers) | ||
| for i, spec in enumerate(self._remote_worker_specs): | ||
| endpoint_id = local_next_level_count + i | ||
| session_id = uuid.uuid4().int & ((1 << 63) - 1) | ||
| if session_id == 0: | ||
| session_id = 1 | ||
| timeout_s = float(self._config.get("remote_session_timeout_s", 30.0)) | ||
| session = self._open_remote_session( | ||
| spec=spec, endpoint_id=endpoint_id, session_id=session_id, timeout_s=timeout_s | ||
| ) | ||
| assert self._worker is not None | ||
| self._worker.add_remote_l3_socket( | ||
| endpoint_id, | ||
| session_id, | ||
| spec.transport, | ||
| session.command_host, | ||
| session.command_port, | ||
| session.health_host, | ||
| session.health_port, | ||
| timeout_s, | ||
| ) | ||
| self._remote_sessions.append(session) |
There was a problem hiding this comment.
Clean up partially opened remote sessions when init() fails.
Remote sessions are opened before _initialized is set. If opening or registering a later remote endpoint fails, earlier sessions stay in _remote_sessions/_worker, and close() returns immediately because _initialized is still false. Add an init-failure cleanup path that closes the partially constructed _Worker/remote sockets and clears session state before re-raising.
Also applies to: 3504-3506
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@python/simpler/worker.py` around lines 2744 - 2765, The remote session
initialization loop starting with the enumeration of self._remote_worker_specs
opens and registers sessions before _initialized is set to true. If an exception
occurs during this process after some sessions have been opened, those partial
sessions remain in self._remote_sessions and self._worker without being cleaned
up since close() returns early when _initialized is false. Wrap the entire
remote session opening and registration loop in a try-except block that catches
any exceptions, and in the except handler, close the self._worker, clear
self._remote_sessions, and any remote sockets that were added, then re-raise the
exception to ensure proper cleanup of partially initialized state before
allowing the exception to propagate.
| def parent_orch(orch, _args, cfg): | ||
| orch.submit_next_level(handle, TaskArgs(), cfg, worker=endpoint) | ||
|
|
||
| with pytest.raises(RuntimeError, match="RemoteL3Endpoint::run|socket closed|health lane"): |
There was a problem hiding this comment.
Use raw string for regex pattern with metacharacters.
The pattern contains pipe characters | which are regex alternation metacharacters. Use a raw string to ensure correct interpretation.
🔧 Proposed fix
- with pytest.raises(RuntimeError, match="RemoteL3Endpoint::run|socket closed|health lane"):
+ with pytest.raises(RuntimeError, match=r"RemoteL3Endpoint::run|socket closed|health lane"):
worker.run(parent_orch)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| with pytest.raises(RuntimeError, match="RemoteL3Endpoint::run|socket closed|health lane"): | |
| with pytest.raises(RuntimeError, match=r"RemoteL3Endpoint::run|socket closed|health lane"): | |
| worker.run(parent_orch) |
🧰 Tools
🪛 Ruff (0.15.17)
[warning] 1117-1117: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/ut/py/test_callable_identity.py` at line 1117, In the pytest.raises
call with the match parameter containing the pattern
"RemoteL3Endpoint::run|socket closed|health lane", convert the match string to a
raw string by prefixing it with the `r` character before the opening quote. This
ensures that the pipe characters used as regex alternation metacharacters are
properly interpreted by the regex engine.
Source: Linters/SAST tools
| assert desc.rkey_or_token == 0xBEEF | ||
|
|
||
| def test_remote_buffer_handle_is_opaque_to_public_constructor(self): | ||
| with pytest.raises(TypeError, match="Worker.remote_malloc"): |
There was a problem hiding this comment.
Use raw string for regex pattern to avoid metacharacter interpretation.
The pattern "Worker.remote_malloc" contains a period which is a regex metacharacter (matches any character). Use a raw string to ensure literal matching.
🔧 Proposed fix
- with pytest.raises(TypeError, match="Worker.remote_malloc"):
+ with pytest.raises(TypeError, match=r"Worker\.remote_malloc"):
RemoteBufferHandle(
endpoint_id=3,
buffer_id=11,Apply the same fix to line 517:
- with pytest.raises(TypeError, match="Worker.remote_malloc"):
+ with pytest.raises(TypeError, match=r"Worker\.remote_malloc"):
RemoteBufferHandle(
endpoint_id=0,Also applies to: 517-517
🧰 Tools
🪛 Ruff (0.15.17)
[warning] 507-507: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/ut/py/test_task_interface.py` at line 507, The regex pattern
"Worker.remote_malloc" used in the pytest.raises match parameter contains a
period which is a regex metacharacter that matches any character instead of a
literal period. Fix this issue by converting both occurrences (at lines 507 and
517 in the pytest.raises calls) to use raw strings by prefixing with the r
prefix so the period is treated as a literal character match rather than a regex
wildcard.
Source: Linters/SAST tools
de883c8 to
efe17e4
Compare
hw-native-sys#1010's local slice-bounds validation added a required handle_nbytes parameter to the remote_copy_to / remote_copy_from / remote_export nanobind bindings (validated_handle_nbytes in worker_bind.h). The Python Worker wrappers here were written against the pre-fix 6-arg signature and were never synced, so every remote-buffer copy raised "TypeError: incompatible function arguments" once this PR rebased onto the merged hw-native-sys#1010 — failing the test_remote_sim_* cases. Pass handle.nbytes (already validated against in each wrapper) as the trailing handle_nbytes argument, and update FakeRemoteCWorker in test_callable_identity.py to mirror the new binding signature. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
PR 1011 Remote L3 Review Fix PlanLocal planning note for PR #1011. This file is not intended to be committed Recommended API DirectionUse the ordinary public The public API should be: args = TaskArgs()
args.add_tensor(remote_ref, TensorArgType.INPUT)
orch.submit_next_level(remote_callable, args)Do not add a public The model is closer to Ray/PyTorch RPC in public API shape: ordinary argument Do not add public Finding 1: Access Flags Are Not EnforcedCurrent ProblemImported handle access bits are documented but not enforced during submit. As a result, a read-only import can be submitted as Design DecisionKeep Do not add another public read/write flag to
Submit validation should check that the tag's minimum required access is a Recommended mapping: Remote This keeps behavior close to local Fix Plan
Suggested tests:
Finding 2: Public API Exposes Raw Transport DetailsCurrent Problem
That conflicts with the documented opaque-handle model. It also leaks simulator Design DecisionKeep remote handles/export descriptors opaque and controlled. Users may pass Fix Plan
Suggested tests:
Finding 3: Public RemoteTaskArgs Is Documented But Not ImplementedCurrent ProblemDocs and the PR body claim a public Design DecisionDo not add a public This is both easier for user code and closer to established distributed APIs Fix Plan
Suggested tests:
Finding 4: Bootstrap Manifest Hashid Is Not RevalidatedCurrent ProblemDynamic CONTROL registration validates import targets and canonical hash ids, That creates two different identity paths. A bad manifest can bind hashid A to Severity And ScopeThis is a reasonable Fix Plan
Suggested tests:
Finding 5: Partial Init Failure Can Leak Remote SessionsCurrent Problem
Severity And ScopeThis is a lifecycle cleanup issue, not over-validation. It is a reasonable Fix Plan
try:
self._init_hierarchical()
except BaseException:
self._cleanup_partial_init()
raise
Suggested tests:
Hidden Sidecar RobustnessThe current hidden sidecar strategy can remain internal, but it should not be a Fix plan:
Pending Remote Free Error HandlingKeep this as a cleanup follow-up from the review even though it was not one of Fix plan:
Acceptance Criteria
|
Summary
This is PR 6 of the remote L3 worker split stack. It adds the Python remote L3
session runtime and simulation-facing remote worker integration on top of the
documents, callable identity, wire codec, scheduler abstraction, and C++
endpoint facade introduced by the earlier PRs.
This is the top implementation PR in the split stack. At this point the stacked
branch content matches the corrected remote L3 feature branch, excluding
drop-only empty CI retrigger commits.
Relationship to PR #866 and split stack
This PR is part of the split stack extracted from #866.
Stack:
The top of the stack was verified to match the corrected
remote-l3-worker-designbranch content from #866, excluding empty CI retrigger commits.
Stack position
remote-l3-split-cpp-remote-endpointremote-l3-split-python-runtimeremote-l3-split-cpp-remote-endpointScope
This PR adds or updates:
simpler-remote-worker.simpler-remote-l3-session.RemoteCallableintegration.RemoteWorkerSpecintegration.RemoteBufferHandle,RemoteTensorRef, andRemoteTaskArgsintegration.and owner-free behavior.
behavior, remote buffer lifecycle, and CLI smoke paths.
Requirements covered
This PR covers the Python runtime and simulation requirements from the remote
L3 contract:
HELLO READYis used as the scheduling barrier before remote work is madeeligible.
Why this is separate
This PR contains the highest-level process and lifecycle behavior. Keeping it
at the top of the stack means earlier PRs can be reviewed for their contracts
and lower-level invariants first, while this PR focuses on Python runtime
integration, process lifecycle, health, cleanup, and simulation behavior.
Important exclusions
This PR intentionally does not add:
Those remain future work per the contract documents.
Verification
Targeted verification already run during the split:
cases skipped by the sandbox.
simpler-remote-worker --help: passed.simpler-remote-l3-session --help: passed.Top-of-stack verification:
remote-l3-worker-designbranch.Reviewer notes
The main review focus should be lifecycle correctness:
started.
remote state.