fix(web): handle BrokenPipeError in SessionProcess.send_message#2324
fix(web): handle BrokenPipeError in SessionProcess.send_message#2324Ricardo-M-L wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 07c10f9dcf
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| await self._emit_status( | ||
| "error", | ||
| reason="stdin_broken", | ||
| detail=f"{e.__class__.__name__}: {e}", | ||
| ) |
There was a problem hiding this comment.
Clear in-flight prompts before emitting stdin_broken
When the broken stdin write is for a JSONRPCPromptMessage, the prompt id was already added to _in_flight_prompt_ids, but this new error path emits the stdin_broken status without clearing it. That makes the session report an error while is_busy remains true, so clients reacting immediately to the status can still be rejected by paths such as get_editable_session()'s busy check until _read_loop later catches up; the existing EOF/error paths explicitly clear in-flight ids before broadcasting for this reason. Clear _in_flight_prompt_ids before this _emit_status call.
Useful? React with 👍 / 👎.
| except (BrokenPipeError, ConnectionResetError) as e: | ||
| # Subprocess died between our `start()` check above and the actual write. | ||
| # `_read_loop` will eventually observe the exit and emit "stopped" / | ||
| # "crashed", but right now the caller (FastAPI / websocket handler) would | ||
| # otherwise see a raw exception propagate to the response. Emit an error | ||
| # status so any attached websocket clients see the failure synchronously. | ||
| logger.warning( | ||
| f"send_message: subprocess stdin {e.__class__.__name__}; " | ||
| f"process likely exited (returncode={process.returncode})" | ||
| ) | ||
| await self._emit_status( | ||
| "error", | ||
| reason="stdin_broken", | ||
| detail=f"{e.__class__.__name__}: {e}", | ||
| ) |
There was a problem hiding this comment.
🟡 In-flight prompt ID not cleaned up on BrokenPipeError leaves session stuck in 'busy' state
When send_message handles a JSONRPCPromptMessage, it adds the prompt ID to _in_flight_prompt_ids at line 647 and emits a "busy" status at line 649 before the stdin write attempt. If the write then fails with BrokenPipeError, the exception handler (lines 667-681) emits an "error" status but never removes the prompt ID from _in_flight_prompt_ids. This leaves is_busy returning True even though the prompt was never delivered to the subprocess.
The comment says _read_loop will eventually clean up, but there's a race window where _read_loop may have already completed its EOF handling (and called clear()) before the ID was added at line 647. In that case, no one clears the orphaned ID until the next start() call or the error-state recovery path in sessions.py:1011-1016. During this window, the session is simultaneously in "error" and "busy" states, and non-prompt messages (like cancel) that check is_busy will behave incorrectly.
| except (BrokenPipeError, ConnectionResetError) as e: | |
| # Subprocess died between our `start()` check above and the actual write. | |
| # `_read_loop` will eventually observe the exit and emit "stopped" / | |
| # "crashed", but right now the caller (FastAPI / websocket handler) would | |
| # otherwise see a raw exception propagate to the response. Emit an error | |
| # status so any attached websocket clients see the failure synchronously. | |
| logger.warning( | |
| f"send_message: subprocess stdin {e.__class__.__name__}; " | |
| f"process likely exited (returncode={process.returncode})" | |
| ) | |
| await self._emit_status( | |
| "error", | |
| reason="stdin_broken", | |
| detail=f"{e.__class__.__name__}: {e}", | |
| ) | |
| except (BrokenPipeError, ConnectionResetError) as e: | |
| # Subprocess died between our `start()` check above and the actual write. | |
| # `_read_loop` will eventually observe the exit and emit "stopped" / | |
| # "crashed", but right now the caller (FastAPI / websocket handler) would | |
| # otherwise see a raw exception propagate to the response. Emit an error | |
| # status so any attached websocket clients see the failure synchronously. | |
| logger.warning( | |
| f"send_message: subprocess stdin {e.__class__.__name__}; " | |
| f"process likely exited (returncode={process.returncode})" | |
| ) | |
| self._in_flight_prompt_ids.clear() | |
| await self._emit_status( | |
| "error", | |
| reason="stdin_broken", | |
| detail=f"{e.__class__.__name__}: {e}", | |
| ) |
Was this helpful? React with 👍 or 👎 to provide feedback.
If the worker subprocess dies between the start() check and the actual stdin write, process.stdin.write/drain raises BrokenPipeError (or ConnectionResetError). Previously this propagated raw to the caller (FastAPI / websocket handler); now we log it and emit an "error"/"stdin_broken" status so attached clients see the failure synchronously. Also drop the prompt's id from _in_flight_prompt_ids on this path: for a JSONRPCPromptMessage the id was registered (and "busy" emitted) before the write, so without cleanup a failed write would leave the session wedged in is_busy forever. (Flagged in review by Codex and Devin.) Adds a regression test asserting is_busy is cleared and status is error/stdin_broken after a broken-pipe write. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
07c10f9 to
d26405a
Compare
|
Thanks for the review. Addressed the in-flight-prompt issue both bots flagged: on the Also rebased onto current |
What does this PR do?
SessionProcess.send_messageinsrc/kimi_cli/web/runner/process.pywrites toprocess.stdinand awaitsdrain()without guarding against the subprocess having exited between thestart()call at the top of the method and the actual write at the bottom:In a normal lifecycle
_read_loopobserves the exit and emits a"stopped"/"crashed"status. But there is a window — subprocess crashes, hits OOM, or is killed by an external signal — wheresend_message()reaches the write before_read_loopruns. The result is a rawBrokenPipeError(orConnectionResetErroron some platforms) propagating up to the FastAPI / websocket handler, surfacing as a 500 to the client with noerrorstatus emitted to any attached websocket subscribers.Fix
Wrap the write + drain pair in a try/except for
BrokenPipeErrorandConnectionResetError, log a warning withprocess.returncode, and emit an"error"status withreason="stdin_broken"so subscribers see the failure synchronously — instead of waiting on_read_loopto eventually emit a different terminal status.No behavior change on the happy path. 14-line addition inside the existing function.
Repro
In a long-running web session, manually
kill -9the subprocess (or trigger an OOM in it) and immediately send a message. Without this PR, the nextsend_message()call raises and the websocket clients see only_read_loop's eventual status; the API call itself crashes with a 500.Test plan
python3.11 -c "import ast; ast.parse(open('.../process.py').read())"— passestry