Skip to content

Commit d727c30

Browse files
Propagate body-level errors as synthetic JSON-RPC responses
Change onErrorComplete to onErrorResume in sendMessage() so that body-level errors (DataBufferLimitException, parse errors, etc.) emit a synthetic JSON-RPC error response to the handler. This resolves pending responses in McpClientSession immediately instead of hanging until requestTimeout. Fixes #889
1 parent 3a78182 commit d727c30

File tree

2 files changed

+228
-4
lines changed

2 files changed

+228
-4
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,9 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
464464
final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
465465
final McpTransportSession<Disposable> transportSession = this.activeSession.get();
466466

467+
// https://github.com/modelcontextprotocol/java-sdk/issues/889
468+
Object requestId = (sentMessage instanceof McpSchema.JSONRPCRequest req) ? req.id() : null;
469+
467470
var uri = Utils.resolveUri(this.baseUri, this.endpoint);
468471
String jsonBody = this.toString(sentMessage);
469472

@@ -636,12 +639,19 @@ else if (statusCode == BAD_REQUEST) {
636639
.retryWhen(authorizationErrorRetrySpec())
637640
.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
638641
.onErrorMap(CompletionException.class, t -> t.getCause())
639-
.onErrorComplete(t -> {
640-
// handle the error first
642+
.onErrorResume(t -> {
641643
this.handleException(t);
642-
// inform the caller of sendMessage
643644
deliveredSink.error(t);
644-
return true;
645+
if (requestId != null) {
646+
// Emit synthetic error so pending response is resolved
647+
logger.warn("Body-level error for request {}, emitting synthetic error response", requestId, t);
648+
McpSchema.JSONRPCResponse errorResponse = new McpSchema.JSONRPCResponse(
649+
McpSchema.JSONRPC_VERSION, requestId, null,
650+
new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR,
651+
"Transport error during response streaming: " + t.getMessage(), null));
652+
return this.handler.get().apply(Mono.just(errorResponse));
653+
}
654+
return Flux.empty();
645655
})
646656
.doFinally(s -> {
647657
logger.debug("SendMessage finally: {}", s);
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* Copyright 2025-2026 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client.transport;
6+
7+
import java.io.IOException;
8+
import java.io.OutputStream;
9+
import java.net.InetSocketAddress;
10+
import java.nio.charset.StandardCharsets;
11+
import java.util.concurrent.CopyOnWriteArrayList;
12+
import java.util.concurrent.CountDownLatch;
13+
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.atomic.AtomicBoolean;
15+
16+
import com.sun.net.httpserver.HttpServer;
17+
import io.modelcontextprotocol.server.transport.TomcatTestUtil;
18+
import io.modelcontextprotocol.spec.McpClientTransport;
19+
import io.modelcontextprotocol.spec.McpSchema;
20+
import io.modelcontextprotocol.spec.McpTransportException;
21+
import io.modelcontextprotocol.spec.ProtocolVersions;
22+
import org.junit.jupiter.api.AfterEach;
23+
import org.junit.jupiter.api.BeforeEach;
24+
import org.junit.jupiter.api.Nested;
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.Timeout;
27+
import reactor.test.StepVerifier;
28+
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
31+
/**
32+
* Tests for body-level error handling in {@link HttpClientStreamableHttpTransport}.
33+
*
34+
* @author James Kennedy
35+
* @see <a href="https://github.com/modelcontextprotocol/java-sdk/issues/889">#889</a>
36+
*/
37+
@Timeout(15)
38+
public class HttpClientStreamableHttpTransportBodyErrorTest {
39+
40+
private static final int PORT = TomcatTestUtil.findAvailablePort();
41+
42+
private static final String HOST = "http://localhost:" + PORT;
43+
44+
private HttpServer server;
45+
46+
private McpClientTransport transport;
47+
48+
private final AtomicBoolean returnMalformedSse = new AtomicBoolean(false);
49+
50+
@BeforeEach
51+
void startServer() throws IOException {
52+
this.server = HttpServer.create(new InetSocketAddress(PORT), 0);
53+
54+
this.server.createContext("/mcp", exchange -> {
55+
String method = exchange.getRequestMethod();
56+
57+
if ("DELETE".equals(method)) {
58+
exchange.sendResponseHeaders(200, 0);
59+
exchange.close();
60+
return;
61+
}
62+
63+
if ("GET".equals(method)) {
64+
exchange.sendResponseHeaders(405, 0);
65+
exchange.close();
66+
return;
67+
}
68+
69+
if (this.returnMalformedSse.get()) {
70+
exchange.getResponseHeaders().set("Content-Type", "text/event-stream");
71+
exchange.sendResponseHeaders(200, 0);
72+
OutputStream os = exchange.getResponseBody();
73+
os.write("event: message\ndata: {not valid json\n\n".getBytes(StandardCharsets.UTF_8));
74+
os.flush();
75+
exchange.close();
76+
return;
77+
}
78+
79+
exchange.getResponseHeaders().set("Content-Type", "application/json");
80+
String response = "{\"jsonrpc\":\"2.0\",\"result\":{},\"id\":\"init-id\"}";
81+
exchange.sendResponseHeaders(200, response.length());
82+
exchange.getResponseBody().write(response.getBytes(StandardCharsets.UTF_8));
83+
exchange.close();
84+
});
85+
86+
this.server.setExecutor(null);
87+
this.server.start();
88+
89+
this.transport = HttpClientStreamableHttpTransport.builder(HOST).build();
90+
}
91+
92+
@AfterEach
93+
void stopServer() {
94+
if (this.server != null) {
95+
this.server.stop(0);
96+
}
97+
StepVerifier.create(this.transport.closeGracefully()).verifyComplete();
98+
}
99+
100+
@Nested
101+
class SseStream {
102+
103+
@Test
104+
void propagatesErrorOnMalformedSseData() {
105+
StepVerifier.create(transport.connect(msg -> msg)).verifyComplete();
106+
returnMalformedSse.set(true);
107+
108+
StepVerifier.create(transport.sendMessage(createRequest("req-123")))
109+
.expectError(McpTransportException.class)
110+
.verify();
111+
}
112+
113+
}
114+
115+
@Nested
116+
class JsonResponse {
117+
118+
@Test
119+
void emitsSyntheticErrorResponseOnMalformedJson() throws InterruptedException {
120+
CopyOnWriteArrayList<McpSchema.JSONRPCMessage> handlerMessages = new CopyOnWriteArrayList<>();
121+
CountDownLatch errorResponseLatch = new CountDownLatch(1);
122+
connectTransportWithErrorCapture(handlerMessages, errorResponseLatch);
123+
replaceServerWithMalformedJsonResponse();
124+
125+
StepVerifier.create(transport.sendMessage(createRequest("req-456"))).verifyComplete();
126+
127+
assertThat(errorResponseLatch.await(5, TimeUnit.SECONDS))
128+
.as("Handler should receive synthetic error response within 5 seconds")
129+
.isTrue();
130+
assertSingleSyntheticError(handlerMessages, "req-456");
131+
}
132+
133+
}
134+
135+
@Nested
136+
class Notification {
137+
138+
@Test
139+
void doesNotEmitSyntheticResponseOnBodyError() throws InterruptedException {
140+
CopyOnWriteArrayList<McpSchema.JSONRPCMessage> handlerMessages = new CopyOnWriteArrayList<>();
141+
StepVerifier.create(transport.connect(msg -> msg.doOnNext(handlerMessages::add))).verifyComplete();
142+
returnMalformedSse.set(true);
143+
McpSchema.JSONRPCNotification notification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
144+
"notifications/cancelled", null);
145+
146+
StepVerifier.create(transport.sendMessage(notification))
147+
.expectError(McpTransportException.class)
148+
.verify();
149+
150+
Thread.sleep(500);
151+
assertThat(handlerMessages.stream()
152+
.filter(m -> m instanceof McpSchema.JSONRPCResponse resp && resp.error() != null)
153+
.toList()).isEmpty();
154+
}
155+
156+
}
157+
158+
private void connectTransportWithErrorCapture(CopyOnWriteArrayList<McpSchema.JSONRPCMessage> handlerMessages,
159+
CountDownLatch errorResponseLatch) {
160+
StepVerifier.create(this.transport.connect(msg -> msg.doOnNext(m -> {
161+
handlerMessages.add(m);
162+
if (m instanceof McpSchema.JSONRPCResponse resp && resp.error() != null) {
163+
errorResponseLatch.countDown();
164+
}
165+
}))).verifyComplete();
166+
}
167+
168+
private McpSchema.JSONRPCRequest createRequest(String requestId) {
169+
return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE, requestId,
170+
new McpSchema.InitializeRequest(ProtocolVersions.MCP_2025_03_26,
171+
McpSchema.ClientCapabilities.builder().roots(true).build(),
172+
new McpSchema.Implementation("Test Client", "1.0.0")));
173+
}
174+
175+
private void replaceServerWithMalformedJsonResponse() {
176+
this.server.removeContext("/mcp");
177+
this.server.createContext("/mcp", exchange -> {
178+
String method = exchange.getRequestMethod();
179+
180+
if ("DELETE".equals(method)) {
181+
exchange.sendResponseHeaders(200, 0);
182+
exchange.close();
183+
return;
184+
}
185+
186+
if ("GET".equals(method)) {
187+
exchange.sendResponseHeaders(405, 0);
188+
exchange.close();
189+
return;
190+
}
191+
192+
exchange.getResponseHeaders().set("Content-Type", "application/json");
193+
byte[] malformed = "{not valid json".getBytes(StandardCharsets.UTF_8);
194+
exchange.sendResponseHeaders(200, malformed.length);
195+
exchange.getResponseBody().write(malformed);
196+
exchange.close();
197+
});
198+
}
199+
200+
private void assertSingleSyntheticError(CopyOnWriteArrayList<McpSchema.JSONRPCMessage> handlerMessages,
201+
String expectedRequestId) {
202+
var errorResponses = handlerMessages.stream()
203+
.filter(m -> m instanceof McpSchema.JSONRPCResponse resp && resp.error() != null)
204+
.map(m -> (McpSchema.JSONRPCResponse) m)
205+
.toList();
206+
207+
assertThat(errorResponses).hasSize(1);
208+
McpSchema.JSONRPCResponse errorResponse = errorResponses.get(0);
209+
assertThat(errorResponse.id()).isEqualTo(expectedRequestId);
210+
assertThat(errorResponse.error().code()).isEqualTo(McpSchema.ErrorCodes.INTERNAL_ERROR);
211+
assertThat(errorResponse.error().message()).contains("Transport error");
212+
}
213+
214+
}

0 commit comments

Comments
 (0)