Skip to content

Commit 0879945

Browse files
committed
Replace file-based IPC with pipe for child process results
Signed-off-by: Cong Wang <cwang@multikernel.io>
1 parent 4bbd118 commit 0879945

File tree

2 files changed

+59
-41
lines changed

2 files changed

+59
-41
lines changed

src/branching/agent/patterns.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ def _on_scope(sp: Path, _i: int = index) -> None:
422422
parent_cgroup=parent_cgroup,
423423
scope_callback=_on_scope if self._resource_limits else None,
424424
)
425-
if isinstance(ret, tuple):
425+
if isinstance(ret, (tuple, list)):
426426
success, score = ret
427427
else:
428428
success = bool(ret)
@@ -604,7 +604,7 @@ def __init__(
604604

605605
def _score(self, ret, path):
606606
"""Parse strategy return and apply optional evaluator."""
607-
if isinstance(ret, tuple):
607+
if isinstance(ret, (tuple, list)):
608608
success, score = ret
609609
else:
610610
success = bool(ret)

src/branching/process/runner.py

Lines changed: 57 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import json
77
import os
8-
import traceback
98
from pathlib import Path
109
from typing import Any, Callable
1110

@@ -26,14 +25,14 @@ def run_in_process(
2625
) -> Any:
2726
"""Run *fn(*args)* in a forked child process, optionally with cgroup limits.
2827
29-
The child writes its return value (or exception info) to a result file
30-
inside *workspace*. The parent reads the result after the child exits.
28+
Results are passed back via an inherited pipe fd — no filesystem
29+
dependency, so this works even when the workspace is a FUSE mount
30+
inaccessible from the child's user namespace.
3131
3232
Args:
3333
fn: Callable to execute.
3434
args: Positional arguments for *fn*.
35-
workspace: Branch workspace path (used for the result file and as the
36-
BranchContext workspace).
35+
workspace: Branch workspace path (passed to BranchContext).
3736
limits: Optional resource limits applied to the child's cgroup.
3837
timeout: Maximum seconds to wait for the child.
3938
parent_cgroup: Optional parent cgroup for hierarchical nesting.
@@ -49,56 +48,75 @@ def run_in_process(
4948
abnormally without writing a result.
5049
Exception: Re-raised from the child if *fn* raised.
5150
"""
52-
result_path = workspace / ".branching_result"
51+
read_fd, write_fd = os.pipe()
5352

5453
def _target(ws_path: Path) -> None:
54+
os.close(read_fd)
5555
try:
5656
value = fn(*args)
57-
result_path.write_text(json.dumps({"ok": True, "value": repr(value)}))
58-
# Store the actual value via a second file so we can return
59-
# JSON-safe primitives directly and fall back to repr for the rest.
60-
_write_result(result_path, value)
57+
_write_result_fd(write_fd, {"ok": True, "value": value})
6158
except BaseException as exc:
62-
try:
63-
result_path.write_text(
64-
json.dumps({"ok": False, "error": f"{type(exc).__name__}: {exc}"})
65-
)
66-
except Exception:
67-
pass
59+
_write_result_fd(
60+
write_fd, {"ok": False, "error": f"{type(exc).__name__}: {exc}"}
61+
)
6862
raise
63+
finally:
64+
os.close(write_fd)
6965

70-
with BranchContext(
71-
_target, workspace=workspace, limits=limits,
72-
parent_cgroup=parent_cgroup,
73-
) as ctx:
74-
if scope_callback is not None and ctx.cgroup_scope is not None:
75-
scope_callback(ctx.cgroup_scope)
76-
try:
77-
ctx.wait(timeout=timeout)
78-
except ProcessBranchError:
79-
pass # handled below via result file
80-
81-
# Read result
82-
if not result_path.exists():
66+
try:
67+
with BranchContext(
68+
_target, workspace=workspace, limits=limits,
69+
parent_cgroup=parent_cgroup,
70+
) as ctx:
71+
os.close(write_fd)
72+
write_fd = -1 # prevent double-close in except branch
73+
if scope_callback is not None and ctx.cgroup_scope is not None:
74+
scope_callback(ctx.cgroup_scope)
75+
try:
76+
ctx.wait(timeout=timeout)
77+
except ProcessBranchError:
78+
pass # handled below via pipe
79+
except Exception:
80+
if write_fd >= 0:
81+
os.close(write_fd)
82+
raise
83+
84+
# Read result from pipe
85+
data = _read_result_fd(read_fd)
86+
os.close(read_fd)
87+
88+
if data is None:
8389
raise ProcessBranchError(
8490
"Child process did not produce a result (possibly OOM-killed)"
8591
)
8692

87-
try:
88-
data = json.loads(result_path.read_text())
89-
finally:
90-
result_path.unlink(missing_ok=True)
91-
9293
if data.get("ok"):
9394
return data.get("value")
9495

9596
raise ProcessBranchError(data.get("error", "unknown child error"))
9697

9798

98-
def _write_result(path: Path, value: Any) -> None:
99-
"""Write a JSON result file. Handles non-serializable values gracefully."""
99+
def _write_result_fd(fd: int, data: dict) -> None:
100+
"""Write a JSON result dict to a pipe fd, handling non-serializable values."""
100101
try:
101-
payload = json.dumps({"ok": True, "value": value})
102+
payload = json.dumps(data).encode()
102103
except (TypeError, ValueError):
103-
payload = json.dumps({"ok": True, "value": repr(value)})
104-
path.write_text(payload)
104+
# Fall back to repr for non-JSON-serializable values
105+
fallback = dict(data)
106+
if "value" in fallback:
107+
fallback["value"] = repr(fallback["value"])
108+
payload = json.dumps(fallback).encode()
109+
os.write(fd, payload)
110+
111+
112+
def _read_result_fd(fd: int) -> dict | None:
113+
"""Read a JSON result dict from a pipe fd. Returns None on empty read."""
114+
chunks = []
115+
while True:
116+
chunk = os.read(fd, 65536)
117+
if not chunk:
118+
break
119+
chunks.append(chunk)
120+
if not chunks:
121+
return None
122+
return json.loads(b"".join(chunks))

0 commit comments

Comments
 (0)