Skip to content

Commit 9f2b105

Browse files
committed
test: close leaked SSE receive streams instead of gc-collecting them
SseServerTransport.connect_sse never closed sse_stream_reader after EventSourceResponse returned; the in-process bridge dropped its chunk reader when a request was cancelled before the response started. Close both at source so the interaction suite no longer needs a gc.collect() workaround, and pull one assert inside its async-with body to clear the last 3.11 coverage gap.
1 parent e0e8e57 commit 9f2b105

5 files changed

Lines changed: 30 additions & 45 deletions

File tree

src/mcp/server/sse.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ async def response_wrapper(scope: Scope, receive: Receive, send: Send):
179179
await EventSourceResponse(content=sse_stream_reader, data_sender_callable=sse_writer)(
180180
scope, receive, send
181181
)
182+
await sse_stream_reader.aclose()
182183
await read_stream_writer.aclose()
183184
await write_stream_reader.aclose()
184185
self._read_stream_writers.pop(session_id, None)

tests/interaction/_connect.py

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
(session ids, SSE encoding, session management) runs with no sockets, threads, or subprocesses.
88
"""
99

10-
import gc
11-
import warnings
1210
from collections.abc import AsyncIterator, Awaitable, Callable, Iterable
1311
from contextlib import AbstractAsyncContextManager, asynccontextmanager
1412
from typing import Any, Protocol
@@ -347,26 +345,14 @@ def httpx_client_factory(
347345
)
348346

349347
transport = sse_client(f"{BASE_URL}/sse", httpx_client_factory=httpx_client_factory)
350-
try:
351-
async with Client(
352-
transport,
353-
read_timeout_seconds=read_timeout_seconds,
354-
sampling_callback=sampling_callback,
355-
list_roots_callback=list_roots_callback,
356-
logging_callback=logging_callback,
357-
message_handler=message_handler,
358-
client_info=client_info,
359-
elicitation_callback=elicitation_callback,
360-
) as client:
361-
yield client
362-
finally:
363-
# SseServerTransport.connect_sse never closes its sse_stream_reader (handed to
364-
# sse_starlette.EventSourceResponse, which does not aclose() its content on cancel).
365-
# After teardown that reader is held only by a reference cycle through the connect_sse
366-
# frame and its task objects; collecting twice runs the cycle's finalizers and then
367-
# frees the reader while ResourceWarning is suppressed, instead of at an arbitrary
368-
# later GC under pytest's error filter. One pass suffices on 3.11+; 3.10 needs both.
369-
with warnings.catch_warnings():
370-
warnings.simplefilter("ignore", ResourceWarning)
371-
gc.collect()
372-
gc.collect()
348+
async with Client(
349+
transport,
350+
read_timeout_seconds=read_timeout_seconds,
351+
sampling_callback=sampling_callback,
352+
list_roots_callback=list_roots_callback,
353+
logging_callback=logging_callback,
354+
message_handler=message_handler,
355+
client_info=client_info,
356+
elicitation_callback=elicitation_callback,
357+
) as client:
358+
yield client

tests/interaction/transports/_bridge.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,16 @@ async def run_application() -> None:
151151
await chunk_writer.aclose()
152152

153153
self._task_group.start_soon(run_application)
154-
await response_started.wait()
155-
if application_error is not None:
156-
# No response will be built, so close the reader the response body would have owned.
154+
try:
155+
await response_started.wait()
156+
if application_error is not None:
157+
raise application_error
158+
except BaseException:
159+
# No response will be built, so close the reader the response body would have owned
160+
# and tell the application its peer has gone away.
161+
client_disconnected.set()
157162
await chunk_reader.aclose()
158-
raise application_error
163+
raise
159164
return httpx.Response(
160165
status_code=response_status,
161166
headers=response_headers,

tests/interaction/transports/test_hosting_session.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -115,17 +115,16 @@ async def test_delete_terminates_the_session_and_subsequent_requests_return_404(
115115
json={"jsonrpc": "2.0", "id": 2, "method": "tools/list"},
116116
headers=base_headers(session_id=session_id),
117117
)
118-
119-
assert (post.status_code, post.json()) == snapshot(
120-
(
121-
404,
122-
{
123-
"jsonrpc": "2.0",
124-
"id": None,
125-
"error": {"code": -32600, "message": "Not Found: Session has been terminated"},
126-
},
118+
assert (post.status_code, post.json()) == snapshot(
119+
(
120+
404,
121+
{
122+
"jsonrpc": "2.0",
123+
"id": None,
124+
"error": {"code": -32600, "message": "Not Found: Session has been terminated"},
125+
},
126+
)
127127
)
128-
)
129128

130129

131130
@requirement("hosting:session:isolation")

tests/interaction/transports/test_sse.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
through the suite's streaming ASGI bridge.
88
"""
99

10-
import gc
11-
import warnings
1210
from uuid import UUID, uuid4
1311

1412
import anyio
@@ -59,10 +57,6 @@ def httpx_client_factory(
5957
assert await client.send_ping() == snapshot(EmptyResult())
6058

6159
assert sse._read_stream_writers == {}
62-
# See connect_over_sse: collect the one stream sse_starlette never closes on disconnect.
63-
with warnings.catch_warnings():
64-
warnings.simplefilter("ignore", ResourceWarning)
65-
gc.collect()
6660

6761

6862
@requirement("transport:sse:post:session-routing")

0 commit comments

Comments
 (0)