Skip to content

fix(web): handle BrokenPipeError in SessionProcess.send_message#2324

Open
Ricardo-M-L wants to merge 1 commit into
MoonshotAI:mainfrom
Ricardo-M-L:contrib/fix-send-message-broken-pipe
Open

fix(web): handle BrokenPipeError in SessionProcess.send_message#2324
Ricardo-M-L wants to merge 1 commit into
MoonshotAI:mainfrom
Ricardo-M-L:contrib/fix-send-message-broken-pipe

Conversation

@Ricardo-M-L

@Ricardo-M-L Ricardo-M-L commented May 19, 2026

Copy link
Copy Markdown

What does this PR do?

SessionProcess.send_message in src/kimi_cli/web/runner/process.py writes to process.stdin and awaits drain() without guarding against the subprocess having exited between the start() call at the top of the method and the actual write at the bottom:

process.stdin.write((message + "\n").encode("utf-8"))
await process.stdin.drain()

In a normal lifecycle _read_loop observes the exit and emits a "stopped" / "crashed" status. But there is a window — subprocess crashes, hits OOM, or is killed by an external signal — where send_message() reaches the write before _read_loop runs. The result is a raw BrokenPipeError (or ConnectionResetError on some platforms) propagating up to the FastAPI / websocket handler, surfacing as a 500 to the client with no error status emitted to any attached websocket subscribers.

Fix

Wrap the write + drain pair in a try/except for BrokenPipeError and ConnectionResetError, log a warning with process.returncode, and emit an "error" status with reason="stdin_broken" so subscribers see the failure synchronously — instead of waiting on _read_loop to eventually emit a different terminal status.

No behavior change on the happy path. 14-line addition inside the existing function.

Repro

In a long-running web session, manually kill -9 the subprocess (or trigger an OOM in it) and immediately send a message. Without this PR, the next send_message() call raises and the websocket clients see only _read_loop's eventual status; the API call itself crashes with a 500.

Test plan

  • python3.11 -c "import ast; ast.parse(open('.../process.py').read())" — passes
  • Manual code review for happy-path equivalence — the original two-line write/drain is preserved unchanged inside the try

Open in Devin Review

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 07c10f9dcf

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +677 to +681
await self._emit_status(
"error",
reason="stdin_broken",
detail=f"{e.__class__.__name__}: {e}",
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Clear in-flight prompts before emitting stdin_broken

When the broken stdin write is for a JSONRPCPromptMessage, the prompt id was already added to _in_flight_prompt_ids, but this new error path emits the stdin_broken status without clearing it. That makes the session report an error while is_busy remains true, so clients reacting immediately to the status can still be rejected by paths such as get_editable_session()'s busy check until _read_loop later catches up; the existing EOF/error paths explicitly clear in-flight ids before broadcasting for this reason. Clear _in_flight_prompt_ids before this _emit_status call.

Useful? React with 👍 / 👎.

@devin-ai-integration devin-ai-integration Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

View 3 additional findings in Devin Review.

Open in Devin Review

Comment on lines +667 to +681
except (BrokenPipeError, ConnectionResetError) as e:
# Subprocess died between our `start()` check above and the actual write.
# `_read_loop` will eventually observe the exit and emit "stopped" /
# "crashed", but right now the caller (FastAPI / websocket handler) would
# otherwise see a raw exception propagate to the response. Emit an error
# status so any attached websocket clients see the failure synchronously.
logger.warning(
f"send_message: subprocess stdin {e.__class__.__name__}; "
f"process likely exited (returncode={process.returncode})"
)
await self._emit_status(
"error",
reason="stdin_broken",
detail=f"{e.__class__.__name__}: {e}",
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 In-flight prompt ID not cleaned up on BrokenPipeError leaves session stuck in 'busy' state

When send_message handles a JSONRPCPromptMessage, it adds the prompt ID to _in_flight_prompt_ids at line 647 and emits a "busy" status at line 649 before the stdin write attempt. If the write then fails with BrokenPipeError, the exception handler (lines 667-681) emits an "error" status but never removes the prompt ID from _in_flight_prompt_ids. This leaves is_busy returning True even though the prompt was never delivered to the subprocess.

The comment says _read_loop will eventually clean up, but there's a race window where _read_loop may have already completed its EOF handling (and called clear()) before the ID was added at line 647. In that case, no one clears the orphaned ID until the next start() call or the error-state recovery path in sessions.py:1011-1016. During this window, the session is simultaneously in "error" and "busy" states, and non-prompt messages (like cancel) that check is_busy will behave incorrectly.

Suggested change
except (BrokenPipeError, ConnectionResetError) as e:
# Subprocess died between our `start()` check above and the actual write.
# `_read_loop` will eventually observe the exit and emit "stopped" /
# "crashed", but right now the caller (FastAPI / websocket handler) would
# otherwise see a raw exception propagate to the response. Emit an error
# status so any attached websocket clients see the failure synchronously.
logger.warning(
f"send_message: subprocess stdin {e.__class__.__name__}; "
f"process likely exited (returncode={process.returncode})"
)
await self._emit_status(
"error",
reason="stdin_broken",
detail=f"{e.__class__.__name__}: {e}",
)
except (BrokenPipeError, ConnectionResetError) as e:
# Subprocess died between our `start()` check above and the actual write.
# `_read_loop` will eventually observe the exit and emit "stopped" /
# "crashed", but right now the caller (FastAPI / websocket handler) would
# otherwise see a raw exception propagate to the response. Emit an error
# status so any attached websocket clients see the failure synchronously.
logger.warning(
f"send_message: subprocess stdin {e.__class__.__name__}; "
f"process likely exited (returncode={process.returncode})"
)
self._in_flight_prompt_ids.clear()
await self._emit_status(
"error",
reason="stdin_broken",
detail=f"{e.__class__.__name__}: {e}",
)
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

If the worker subprocess dies between the start() check and the actual
stdin write, process.stdin.write/drain raises BrokenPipeError (or
ConnectionResetError). Previously this propagated raw to the caller
(FastAPI / websocket handler); now we log it and emit an
"error"/"stdin_broken" status so attached clients see the failure
synchronously.

Also drop the prompt's id from _in_flight_prompt_ids on this path: for a
JSONRPCPromptMessage the id was registered (and "busy" emitted) before the
write, so without cleanup a failed write would leave the session wedged in
is_busy forever. (Flagged in review by Codex and Devin.)

Adds a regression test asserting is_busy is cleared and status is
error/stdin_broken after a broken-pipe write.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@Ricardo-M-L Ricardo-M-L force-pushed the contrib/fix-send-message-broken-pipe branch from 07c10f9 to d26405a Compare June 13, 2026 04:10
@Ricardo-M-L

Copy link
Copy Markdown
Author

Thanks for the review. Addressed the in-flight-prompt issue both bots flagged: on the BrokenPipeError/ConnectionResetError path, a JSONRPCPromptMessage id was registered in _in_flight_prompt_ids (and busy emitted) before the failed write, so the session stayed wedged in is_busy. The error path now drops that id before emitting stdin_broken, and I added a regression test in tests/web/test_session_error_recovery.py asserting is_busy is cleared and the status is error/stdin_broken.

Also rebased onto current main (was 15 commits behind); the source diff is just web/runner/process.py. CI should be green now — happy to squash if preferred.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant