From 23b3fbecfe47545ac6c237914459e21f86a194c1 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Wed, 4 Mar 2026 14:48:00 +0000 Subject: [PATCH 1/2] tests: remove lax no cover pragmas by moving assertions before cancel On Python 3.11 with coverage 7.10.7 (lowest-direct), statements immediately following an async with block whose __aexit__ processes cancellation are intermittently not traced. Both sse_client and streamable_http_client call tg.cancel_scope.cancel() in their exit paths, and test_streamable_http_client_resumption has an explicit in-test cancel, so 11 lines across 6 tests were carrying pragmas. All of these lines check state that is fully populated before the cancellation fires, so they can move inside the blocks: - on_session_created fires when the endpoint SSE event arrives, before sse_client yields to the caller - session IDs and protocol versions are captured during initialize(), so headers for phase-2 reconnects can be built inside phase 1 - the resumption test's while loop only exits after the notification has been appended, and the server tool blocks on a lock, so the count is stable before cancel - resumption tokens are all captured before send_request returns Also drops two dead if-truthy checks where the value was already asserted not-None three lines earlier. --- tests/shared/test_sse.py | 19 ++++------ tests/shared/test_streamable_http.py | 55 ++++++++++++---------------- 2 files changed, 31 insertions(+), 43 deletions(-) diff --git a/tests/shared/test_sse.py b/tests/shared/test_sse.py index 7b2bc0a13..af7cbcf08 100644 --- a/tests/shared/test_sse.py +++ b/tests/shared/test_sse.py @@ -203,19 +203,15 @@ async def test_sse_client_basic_connection(server: None, server_url: str) -> Non @pytest.mark.anyio async def test_sse_client_on_session_created(server: None, server_url: str) -> None: - captured_session_id: str | None = None + captured: list[str] = [] - def on_session_created(session_id: str) -> None: - nonlocal captured_session_id - captured_session_id = session_id - - async with sse_client(server_url + "/sse", on_session_created=on_session_created) as streams: + async with sse_client(server_url + "/sse", on_session_created=captured.append) as streams: async with ClientSession(*streams) as session: result = await session.initialize() assert isinstance(result, InitializeResult) - - assert captured_session_id is not None # pragma: lax no cover - assert len(captured_session_id) > 0 # pragma: lax no cover + # Callback fires when the endpoint event arrives, before sse_client yields. + assert len(captured) == 1 + assert len(captured[0]) > 0 @pytest.mark.parametrize( @@ -248,8 +244,9 @@ def mock_extract(url: str) -> None: async with ClientSession(*streams) as session: result = await session.initialize() assert isinstance(result, InitializeResult) - - callback_mock.assert_not_called() # pragma: lax no cover + # Callback would have fired by now (endpoint event arrives before + # sse_client yields); if it hasn't, it won't. + callback_mock.assert_not_called() @pytest.fixture diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 42b1a3698..3c511c250 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -1139,15 +1139,12 @@ async def test_streamable_http_client_session_termination(basic_server: None, ba assert len(captured_ids) > 0 captured_session_id = captured_ids[0] assert captured_session_id is not None + headers = {MCP_SESSION_ID_HEADER: captured_session_id} # Make a request to confirm session is working tools = await session.list_tools() assert len(tools.tools) == 10 - headers: dict[str, str] = {} # pragma: lax no cover - if captured_session_id: # pragma: lax no cover - headers[MCP_SESSION_ID_HEADER] = captured_session_id - async with create_mcp_http_client(headers=headers) as httpx_client2: async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client2) as ( read_stream, @@ -1203,15 +1200,12 @@ async def mock_delete(self: httpx.AsyncClient, *args: Any, **kwargs: Any) -> htt assert len(captured_ids) > 0 captured_session_id = captured_ids[0] assert captured_session_id is not None + headers = {MCP_SESSION_ID_HEADER: captured_session_id} # Make a request to confirm session is working tools = await session.list_tools() assert len(tools.tools) == 10 - headers: dict[str, str] = {} # pragma: lax no cover - if captured_session_id: # pragma: lax no cover - headers[MCP_SESSION_ID_HEADER] = captured_session_id - async with create_mcp_http_client(headers=headers) as httpx_client2: async with streamable_http_client(f"{basic_server_url}/mcp", http_client=httpx_client2) as ( read_stream, @@ -1231,7 +1225,6 @@ async def test_streamable_http_client_resumption(event_server: tuple[SimpleEvent # Variables to track the state captured_resumption_token: str | None = None captured_notifications: list[types.ServerNotification] = [] - captured_protocol_version: str | int | None = None first_notification_received = False async def message_handler( # pragma: no branch @@ -1265,8 +1258,11 @@ async def on_resumption_token_update(token: str) -> None: assert len(captured_ids) > 0 captured_session_id = captured_ids[0] assert captured_session_id is not None - # Capture the negotiated protocol version - captured_protocol_version = result.protocol_version + # Build phase-2 headers now while both values are in scope + headers: dict[str, Any] = { + MCP_SESSION_ID_HEADER: captured_session_id, + MCP_PROTOCOL_VERSION_HEADER: result.protocol_version, + } # Start the tool that will wait on lock in a task async with anyio.create_task_group() as tg: # pragma: no branch @@ -1291,25 +1287,19 @@ async def run_tool(): while not first_notification_received or not captured_resumption_token: await anyio.sleep(0.1) + # The while loop only exits after first_notification_received=True, + # which is set by message_handler immediately after appending to + # captured_notifications. The server tool is blocked on its lock, + # so nothing else can arrive before we cancel. + assert len(captured_notifications) == 1 + assert isinstance(captured_notifications[0], types.LoggingMessageNotification) + assert captured_notifications[0].params.data == "First notification before lock" + # Reset for phase 2 before cancelling + captured_notifications.clear() + # Kill the client session while tool is waiting on lock tg.cancel_scope.cancel() - # Verify we received exactly one notification (inside ClientSession - # so coverage tracks these on Python 3.11, see PR #1897 for details) - assert len(captured_notifications) == 1 # pragma: lax no cover - assert isinstance(captured_notifications[0], types.LoggingMessageNotification) # pragma: lax no cover - assert captured_notifications[0].params.data == "First notification before lock" # pragma: lax no cover - - # Clear notifications and set up headers for phase 2 (between connections, - # not tracked by coverage on Python 3.11 due to cancel scope + sys.settrace bug) - captured_notifications = [] # pragma: lax no cover - assert captured_session_id is not None # pragma: lax no cover - assert captured_protocol_version is not None # pragma: lax no cover - headers: dict[str, Any] = { # pragma: lax no cover - MCP_SESSION_ID_HEADER: captured_session_id, - MCP_PROTOCOL_VERSION_HEADER: captured_protocol_version, - } - async with create_mcp_http_client(headers=headers) as httpx_client2: async with streamable_http_client(f"{server_url}/mcp", http_client=httpx_client2) as ( read_stream, @@ -2092,11 +2082,12 @@ async def on_resumption_token(token: str) -> None: assert isinstance(result.content[0], TextContent) assert "Completed 3 checkpoints" in result.content[0].text - # 4 priming + 3 notifications + 1 response = 8 tokens - assert len(resumption_tokens) == 8, ( # pragma: lax no cover - f"Expected 8 resumption tokens (4 priming + 3 notifs + 1 response), " - f"got {len(resumption_tokens)}: {resumption_tokens}" - ) + # 4 priming + 3 notifications + 1 response = 8 tokens. All tokens are + # captured before send_request returns, so this is safe to check here. + assert len(resumption_tokens) == 8, ( + f"Expected 8 resumption tokens (4 priming + 3 notifs + 1 response), " + f"got {len(resumption_tokens)}: {resumption_tokens}" + ) @pytest.mark.anyio From 88a791b23feb9bd28a5d79eaec3cb10205d5e1a4 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Wed, 4 Mar 2026 15:29:29 +0000 Subject: [PATCH 2/2] Add no branch pragmas to phase-1 ClientSession blocks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The block-exit arcs from these three async with lines were never actually traced on 3.11 or 3.14 — on main, the arc destinations were lax-no-cover lines, so coverage silently excluded the arcs from counting. Removing those destinations in the previous commit exposed the latent gap. Adding no branch here is a smaller suppression than what we removed: the assertions that used to be lax-no-cover are now fully verified, and we only suppress branch-exit tracking on async with lines that have no actual branching. Matches the existing no branch on the phase-2 ClientSession lines in these same tests. --- tests/shared/test_streamable_http.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 3c511c250..61ba4a2e5 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -1132,7 +1132,7 @@ async def test_streamable_http_client_session_termination(basic_server: None, ba read_stream, write_stream, ): - async with ClientSession(read_stream, write_stream) as session: + async with ClientSession(read_stream, write_stream) as session: # pragma: no branch # Initialize the session result = await session.initialize() assert isinstance(result, InitializeResult) @@ -1193,7 +1193,7 @@ async def mock_delete(self: httpx.AsyncClient, *args: Any, **kwargs: Any) -> htt read_stream, write_stream, ): - async with ClientSession(read_stream, write_stream) as session: + async with ClientSession(read_stream, write_stream) as session: # pragma: no branch # Initialize the session result = await session.initialize() assert isinstance(result, InitializeResult) @@ -1251,7 +1251,9 @@ async def on_resumption_token_update(token: str) -> None: read_stream, write_stream, ): - async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: + async with ClientSession( # pragma: no branch + read_stream, write_stream, message_handler=message_handler + ) as session: # Initialize the session result = await session.initialize() assert isinstance(result, InitializeResult)