Skip to content

Add streamable HTTP transport#41

Open
agentcooper wants to merge 2 commits into
coder:mainfrom
poolsideai:agentcooper/streamable-http
Open

Add streamable HTTP transport#41
agentcooper wants to merge 2 commits into
coder:mainfrom
poolsideai:agentcooper/streamable-http

Conversation

@agentcooper
Copy link
Copy Markdown
Contributor

Implementation is based on:

  1. ACP RFD: Streamable HTTP & WebSocket Transport
  2. Proposed changes to the Rust SDK (see ACP Zulip message)

The RFD is proposing streamable HTTP as a base network transport with an option to upgrade to WebSockets. I didn't implement WebSockets part yet, this will be a separate PR.

In our pool client we are trying it out and can successfully connect and talk to the Goose agent over HTTP:

CleanShot 2026-05-14 at 15 50 49@2x

@ThomasK33
Copy link
Copy Markdown
Member

@codex review
/coder-agents-review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 6917c3d4c1

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread acphttp/rpc.go Outdated
Comment on lines +128 to +135
case "session/cancel",
"session/close",
"session/load",
"session/prompt",
"session/resume",
"session/set_config_option",
"session/set_mode",
"session/set_model":
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Require Acp-Session-Id for session/fork

When clients use the unstable session/fork method, its params include a required sessionId (UnstableForkSessionRequest.SessionId), so it needs the same Acp-Session-Id enforcement/routing as the other session-scoped POSTs. Leaving it out here means the HTTP server accepts a fork POST without the session header and records the response on the connection-scoped stream, which breaks streamable-HTTP clients that rely on the header contract for session-scoped methods and can deliver the fork response on the wrong stream.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coder-agents-review coder-agents-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid transport implementation. The pipe-bridge architecture is pragmatic given the SDK's existing io.Reader/io.Writer contract, the routing logic (connection vs session streams, session/load special-casing, pre-subscribe buffering) is well thought through, and the E2E test wiring both sides through real SDK connections is a strong trust signal. The RFD adherence is thorough.

Severity count: 1 P1, 8 P2, 13 P3, 5 Nit.

The P1 is convergent across 10 reviewers: outboundStream.push silently drops messages to slow subscribers, which can permanently hang the client SDK with zero diagnostic. The structural fix (close the subscriber on channel overflow so the client reconnects instead of hanging forever) is surgical and several reviewers independently converged on it.

The P2s cluster around three themes: (1) routing correctness (session header used without checking IsSessionScoped, missing session/fork in the method list), (2) resource lifecycle (zombie connections after agent death, unbounded connection creation, session stream accumulation), and (3) test gaps on error paths and the replay buffer.

"The stdio path applies backpressure through the pipe: a slow reader slows the writer. The HTTP transport's fire-and-forget push removes that backpressure and introduces silent data loss. The trade-off is reasonable for SSE, but the implicit reliability downgrade relative to stdio is undocumented." (Pariston)

🤖 This review was automatically generated with Coder Agents.

return
}
for _, sub := range s.subscribers {
select {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 [DEREM-4] Silent message drop to slow subscribers can permanently hang the client SDK.

When a subscriber's 128-entry channel is full, push does a non-blocking send and silently discards the message. If the SSE handler is blocked on Fprintf/Flush (TCP backpressure, slow client, reverse proxy buffering), the channel fills. Once it overflows, JSON-RPC responses to client requests are discarded. The client SDK blocks indefinitely waiting for a response that was dropped. No log, no subscriber teardown, no way for either side to detect the gap. The response is already consumed from pending by takePendingRoute in the router, so there is no replay path.

The drop-over-block tradeoff protects the router goroutine from stalling all streams on one slow client. That's the right instinct, but the failure mode needs to be visible. (Ryosuke)

The structural fix: on overflow, call sub.closeDone() to tear down the stream. The SSE handler already checks sub.done in its select and will exit cleanly. This converts a silent protocol hang into a visible disconnect that the client's reconnect logic already handles. Data is still lost, but a silent hang becomes a detectable reconnect. (Ryosuke P1, Knov P2, Hisoka P2, Killua P2, Chopper P2, Meruem P2, Knuckle P2, Mafuuu P2)

🤖

// Observe the agent-side connection's lifecycle so if the agent
// goroutines die (peer closed, unrecoverable error), we tear the
// connection down.
go func() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 [DEREM-5] Watcher goroutine leaks zombie connections in the server map.

The watcher calls c.shutdown() when the agent-side connection dies, but never calls s.removeConn(c.id). The connection struct has no back-reference to the server. Every client that disconnects without sending DELETE (crash, timeout, network drop) leaves a zombie entry in s.connections. The map grows without bound until the server is restarted or Close() is called.

Fix: capture s in the closure and call s.removeConn(c.id) before c.shutdown(). (Kite)

🤖

Comment thread acphttp/server/handlers.go Outdated
// stream alongside session/new: the client hasn't necessarily opened
// the session-scoped GET when it issues session/load, so the
// connection stream is the only place it is guaranteed to land.
if sessionHeader != "" && envelope.Method != "session/load" {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 [DEREM-6] Response routing uses raw Acp-Session-Id header regardless of whether the method is session-scoped.

The condition sessionHeader != "" && envelope.Method != "session/load" consults the raw header, not IsSessionScoped. A malformed (or adversarial) session/new POST that includes Acp-Session-Id: some-id passes the "missing header" guard (IsSessionScoped returns false for session/new), but lands in the session-route branch here because sessionHeader is non-empty. The response gets pushed to a session stream the client isn't listening on. Client hangs.

The well-behaved client in this PR never triggers this, but this is a public transport API.

Fix: gate session routing on IsSessionScoped:

if acphttp.IsSessionScoped(envelope.Method) && envelope.Method != "session/load" && sessionHeader != "" {

(Knov)

🤖

Comment thread acphttp/rpc.go Outdated
Comment on lines +126 to +135
func IsSessionScoped(method string) bool {
switch method {
case "session/cancel",
"session/close",
"session/load",
"session/prompt",
"session/resume",
"session/set_config_option",
"session/set_mode",
"session/set_model":
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 [DEREM-7] IsSessionScoped omits session/fork (and other unstable methods with required sessionId).

The function's doc says it lists "every agent-side request type whose params include a required 'sessionId' field." The comment at lines 119-122 justifies including session/set_model (also unstable) so "transports speaking the unstable protocol get correct header behaviour out of the box." session/fork (UnstableForkSessionRequest in types_gen.go) has the same shape: unstable, required sessionId.

Consequence: the server accepts a session/fork POST without Acp-Session-Id, and the response routing records routeConnection instead of routeSession, delivering the fork response on the wrong stream.

Melody also identified siblings: nes/close, nes/suggest, nes/accept, nes/reject, and five document/did* methods all carry sessionId in params and are absent. (Melody P2, Luffy P3, Zoro P3)

🤖

Comment thread acphttp/server/server.go
if s.closed {
return fmt.Errorf("httpserver: server closed")
}
s.connections[id] = c
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 [DEREM-8] No limit on concurrent connections; every initialize POST creates an agent, goroutines, and pipes with no cap.

addConn checks only s.closed. An attacker (or a misfiring reconnect loop) sends initialize in a loop. Each call runs Factory(ctx), allocates two io.Pipe pairs, a router goroutine, a lifecycle-watcher goroutine, plus the connection's outbound streams. Goroutines, file descriptors, and memory scale linearly with request rate. createConnection calls the factory before addConn, so even a hypothetical check in addConn would still execute the factory.

Fix: add a MaxConnections field to Config, enforce in createConnection before calling Factory, return 503 from handleInitialize when the limit is hit. (Kurapika, Knov, Pen Botter)

🤖

Comment thread acphttp/server/server_test.go Outdated

// --- JSON envelope helper used in tests ---

type jsonRPCMessage struct {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 [DEREM-2] jsonRPCMessage type is defined but unused. Silenced with var _ = jsonRPCMessage{} and annotated "kept for future use." Dead test infrastructure should not ship; add it when needed. (Netero)

🤖

Comment thread acphttp/client/streams.go
func (t *Transport) runStream(ctx context.Context, sessionID, label string) {
// Backoff for reconnect attempts when the network drops but the
// transport is still alive (e.g. server restart).
backoff := 250 * time.Millisecond
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 [DEREM-3] Reconnect backoff never resets after a healthy stream connection. After several disconnect/reconnect cycles, backoff is permanently at 5s regardless of connection health. Reset backoff = 250 * time.Millisecond when a stream successfully opens and delivers at least one event. (Netero)

🤖

Comment thread acphttp/client/client.go
closedCh chan struct{}
}

// Dial creates a Transport. It does not perform any network I/O; the
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit [DEREM-33] Dial implies network I/O but the godoc says "it does not perform any network I/O." Standard Go naming: Dial means "connect now" (net.Dial, grpc.Dial, websocket.Dial). Consider NewTransport to match the type it returns and the zero-I/O contract. (Gon)

🤖

sessionStreams sync.Map // map[string]*outboundStream

// pending records where server → client responses to client → server
// POSTs must go, keyed by the canonical JSON-RPC id.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note [DEREM-35] pending map has no eviction for orphaned entries. If the agent drops a response (bug, crash mid-turn), the entry is never removed and accumulates for the connection's lifetime. Per-entry cost is low and the trigger requires an agent bug, so this is low priority. A periodic sweep or size cap would close the gap. (Knuckle)

🤖

Comment thread acphttp/server/server.go
// Handler returns an http.Handler that serves the ACP endpoint. Mount it
// at the root of an http.Server (routing is done internally so callers can
// mix with unrelated routes if they use an outer mux).
func (s *Server) Handler() http.Handler {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note [DEREM-36] Handler() users must call Close() manually to avoid goroutine leaks. Connection contexts are rooted in context.Background(), not any server-level context. ListenAndServe/Serve call Close() on context cancellation, but Handler() returns a raw http.Handler with no lifecycle hook. Worth noting in the doc comment. (Meruem)

🤖

@agentcooper
Copy link
Copy Markdown
Contributor Author

@ThomasK33 Hey Thomas, I addressed most of the review comments.

@ThomasK33
Copy link
Copy Markdown
Member

/coder-agents-review
@codex review

@coder-agents-review
Copy link
Copy Markdown

coder-agents-review Bot commented May 28, 2026

Chat: Review posted | View chat
Requested: 2026-05-28 09:10 UTC by @ThomasK33
Spend: $53.29 / $100.49

Review history
  • R2 (2026-05-28), 6 Nit, 2 Note, 1 P1, 9 P2, 17 P3, COMMENT. Review

deep-review v0.5.0 | Round 2 | 192e108..71420b2

Last posted: Round 2, 35 findings (1 P1, 9 P2, 17 P3, 6 Nit, 2 Note), COMMENT. Review

Finding inventory

Findings

# Sev Status Location Summary Round Reviewer Posted
DEREM-1 P3 Author fixed (71420b2) acphttp/server/server.go:55 pipeBufferSize declared but never referenced R1 Netero Yes
DEREM-2 P3 Author fixed (71420b2) acphttp/server/server_test.go:427 jsonRPCMessage type defined but unused R1 Netero Yes
DEREM-3 P3 Author fixed (71420b2) acphttp/client/streams.go:72 Reconnect backoff never resets after healthy connection R1 Netero Yes
DEREM-4 P1 Author fixed (71420b2) acphttp/server/connection.go:76 Silent message drop to slow subscribers can permanently hang client R1 Ryosuke P1, Knov P2, Hisoka P2, Killua P2, Chopper P2, Meruem P2, Knuckle P2, Mafuuu P2, Melody P3, Pariston P3 Yes
DEREM-5 P2 Author fixed (71420b2) acphttp/server/connection.go:233 Watcher goroutine leaks zombie connections in server map R1 Kite Yes
DEREM-6 P2 Author fixed (71420b2) acphttp/server/handlers.go:88 Response routing uses raw session header regardless of IsSessionScoped R1 Knov P1 Yes
DEREM-7 P2 Author fixed (71420b2) acphttp/rpc.go:126 IsSessionScoped omits session/fork and other unstable methods R1 Melody P2, Luffy P3, Zoro P3 Yes
DEREM-8 P2 Open acphttp/server/server.go:193 No limit on concurrent connections; unbounded resource allocation R1 Kurapika P1, Knov P2, Pen Botter P3 Yes
DEREM-9 P2 Open acphttp/client/client_test.go:231 No test for client transport error paths (5 paths in doInitialize) R1 Bisky P2, Chopper P3 Yes
DEREM-10 P2 Open acphttp/server/server_test.go:176 No test exercises pre-subscribe replay buffer R1 Bisky Yes
DEREM-11 P2 Author fixed (71420b2) acphttp/server/handlers.go:99 Error message strips all diagnostic context on 500 R1 Chopper P2, Leorio P2 Yes
DEREM-12 P2 Open acphttp/client/streams.go:75 Unbounded reconnect loop converts permanent failure into silent client hang R1 Pariston Yes
DEREM-13 P2 Author fixed (71420b2) acphttp/server/connection.go:69 Silent message drop in pre-buffer; comment promises warning that does not exist R1 Chopper P3, Hisoka P3, Kite P2, Zoro P3, Mafuuu Nit Yes
DEREM-14 P3 Author fixed (71420b2) acphttp/server/connection.go:192 createConnection accepts parent context.Context but ignores it R1 Chopper P3, Hisoka P3, Kite P3, Knov P3, Zoro P3, Razor P2, Meruem Nit Yes
DEREM-15 P3 Open acphttp/server/connection.go:168 Session-scoped streams never cleaned up until connection shutdown R1 Mafuuu P3, Zoro P4, Pen Botter P2, Knuckle P3 Yes
DEREM-16 P3 Author fixed (71420b2) acphttp/server/connection.go:366 routerWG never waited on in shutdown; goroutines outlive Close() R1 Ryosuke P2, Takumi P3 Yes
DEREM-17 P3 Author fixed (71420b2) acphttp/server/handlers.go:169 http.Flusher type assertion fails with middleware-wrapped ResponseWriter R1 Ging-Go Yes
DEREM-18 P3 Author fixed (71420b2) acphttp/client/client_test.go:84 require.NoError in handler goroutine kills handler not test R1 Bisky Yes
DEREM-19 P3 Author fixed (71420b2) acphttp/server/serve_test.go:111 time.Sleep(50ms) as synchronization in shutdown test R1 Bisky Yes
DEREM-20 P3 Author fixed (71420b2) acphttp/client/streams.go:113 Four error messages drop the httpclient: prefix R1 Leorio Yes
DEREM-21 P3 Author fixed (71420b2) acphttp/server/handlers.go:33 Error body swallowed on read failure R1 Leorio Yes
DEREM-22 P3 Open acphttp/client/streams.go:89 Clean server EOF permanently kills client SSE streams R1 Razor Yes
DEREM-23 P3 Open acphttp/client/streams.go:54 WaitGroup Add after Wait race in ensureSessionStream R1 Takumi P2 Yes
DEREM-24 P3 Open acphttp/rpc.go:80 CanonicalID naming diverges from root SDK canonicalJSONRPCIDKey R1 Robin Yes
DEREM-25 P3 Author fixed (71420b2) acphttp/server/serve.go:38 BuildHTTPServer sets zero timeouts including ReadHeaderTimeout R1 Kurapika P3, Pen Botter P3 Yes
DEREM-26 P3 Author fixed (71420b2) acphttp/client/rw.go:137 Double timeout on dispatch (context + http.Client.Timeout) R1 Razor Yes
DEREM-27 P3 Author fixed (71420b2) acphttp/server/handlers.go:111 Factory error reflected verbatim to HTTP client R1 Kurapika P2 Yes
DEREM-28 Nit Author fixed (71420b2) acphttp/server/handlers.go:36 bytes.TrimSpace avoids string round-trip allocation R1 Ging-Go Nit, Hisoka Nit, Killua P3, Knov Nit, Zoro Nit Yes
DEREM-29 Nit Author fixed (71420b2) acphttp/server/handlers.go:58 handlePost double-parses JSON via IsInitialize R1 Killua P3, Razor Nit, Ryosuke Nit, Zoro Nit Yes
DEREM-30 Nit Author fixed (71420b2) acphttp/client/streams.go:237 splitSSEField hand-rolls strings.Cut R1 Ging-Go Yes
DEREM-31 Nit Author fixed (71420b2) acphttp/server/connection.go:17 outboundSubscriber doc references nonexistent methods R1 Kite Nit, Mafuuu Nit Yes
DEREM-32 Nit Author fixed (71420b2) acphttp/rpc.go:126 IsSessionScoped uses string literals instead of generated constants R1 Robin P4, Zoro P3 Yes
DEREM-33 Nit Open acphttp/client/client.go:111 Dial implies network I/O but performs none R1 Gon Yes
DEREM-34 Nit Dropped by orchestrator (multiple bare literals, project style allows) various Magic channel/buffer capacity constants unnamed R1 Gon No
DEREM-35 Note Open acphttp/server/connection.go:171 pending map has no eviction for orphaned entries R1 Knuckle Yes
DEREM-36 Note Author fixed (71420b2) acphttp/server/server.go:123 Handler() users must call Close() manually to avoid goroutine leaks R1 Meruem Yes

Contested and acknowledged

(none)

Round log

Round 1

Panel (23 reviewers: Bisky, Chopper, Ging-Go, Gon, Hisoka, Killua, Kite, Knov, Kurapika, Leorio, Luffy, Mafu-san, Mafuuu, Melody, Meruem, Pariston, Razor, Robin, Ryosuke, Takumi, Zoro + wildcards Knuckle, Pen Botter). Netero 3 P3. Panel: 1 P1, 8 P2, 13 P3, 5 Nit, 2 Note. 1 dropped. Reviewed against 192e108..6917c3d.

Round 2

BLOCKED. 25 addressed, 10 silent. No review. Reviewed against 192e108..71420b2.

About deep-review

CRF = Coder Review Finding (P0-P4, Nit, Note)

Reviewer Focus
Bisky tests
Chopper ops/errors
Churn-guard change verification
Ging language modernization
Gon naming
Hisoka edge cases
Killua perf
Kite change integrity
Knov contracts
Knuckle SQL
Kurapika security
Leorio docs
Luffy product
Mafu-san process
Mafuuu contracts
Melody dispatch/pairing
Meruem structural
Nami frontend
Netero mechanical checks
Pariston premise testing
Pen-botter product gaps
Razor verification
Robin duplication
Ryosuke Go arch
Takumi concurrency
Zoro shape

🤖 Managed by Coder Agents.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 71420b28f9

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread acphttp/client/streams.go
Comment on lines +95 to +96
if err == nil || errors.Is(err, context.Canceled) || t.isClosed() {
return
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Reconnect SSE streams after clean EOF

When an SSE GET ends normally, parseSSE returns nil and this branch permanently exits the stream. That can happen in this implementation when the server closes a slow subscriber via sub.done on outbound overflow (acphttp/server/connection.go says this relies on client reconnects), or when an intermediary cleanly closes an idle stream; after that the connection-scoped stream is never reopened, so later session/new/session/load responses are only buffered server-side and SDK callers can wait forever. Treat EOF as reconnectable unless the transport is closing.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coder-agents-review coder-agents-review Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good progress: 25 of 35 findings addressed in 71420b2. The P1 (silent subscriber drop), both routing issues (DEREM-6, DEREM-7), the zombie connection leak (DEREM-5), and many P3/Nit fixes all look right. Thank you for the thorough response.

Further review is blocked until the remaining 10 findings receive a response (fix, acknowledge, or contest). The panel has not yet verified the fixes; that happens in the next unblocked round.

Silent P2 findings (need response before panel review proceeds):

  • DEREM-8 No limit on concurrent connections. Add MaxConnections to Config, or explain why deployment-level limiting is sufficient.
  • DEREM-9 No test for client transport error paths (5 paths in doInitialize). Add error-path tests, or explain why the happy-path coverage is sufficient.
  • DEREM-10 No test exercises the pre-subscribe replay buffer. Add a test that sends messages before a subscriber attaches, or explain.
  • DEREM-12 Unbounded reconnect loop converts permanent server failure into silent client hang. Add a max retry or configurable timeout, or explain why infinite retry is correct.

Silent P3 findings:

  • DEREM-15 Session-scoped streams never cleaned up until connection shutdown.
  • DEREM-22 Clean server EOF permanently kills client SSE streams with no reconnect.
  • DEREM-23 WaitGroup Add/Wait race in ensureSessionStream and openConnectionStream.
  • DEREM-24 CanonicalID naming diverges from root SDK's canonicalJSONRPCIDKey.

Silent Nit/Note findings:

  • DEREM-33 Dial naming implies network I/O.
  • DEREM-35 pending map has no eviction for orphaned entries.

For any finding you disagree with, a reply explaining why is sufficient to unblock. For findings you plan to defer, file a ticket and link it.

🤖 This review was automatically generated with Coder Agents.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants