-
Notifications
You must be signed in to change notification settings - Fork 43
Expand file tree
/
Copy pathtest_mcp_http_transport.py
More file actions
193 lines (158 loc) · 6.55 KB
/
test_mcp_http_transport.py
File metadata and controls
193 lines (158 loc) · 6.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
"""
Tests for the MCP transport interface with HTTP transport.
"""
import sys
import pytest
import pytest_asyncio
import asyncio
import subprocess
import time
import os
import socket
from typing import List, Optional, Tuple
from utcp_mcp.mcp_call_template import McpCallTemplate, McpConfig
from utcp_mcp.mcp_communication_protocol import McpCommunicationProtocol
HTTP_SERVER_NAME = "mock_http_server"
HTTP_SERVER_PORT = 8000
@pytest_asyncio.fixture
async def http_server_process() -> subprocess.Popen:
"""Start the HTTP MCP server as a separate process."""
server_path = os.path.join(
os.path.dirname(__file__), "mock_http_mcp_server.py"
)
process = subprocess.Popen(
[sys.executable, server_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Wait for the server to be ready by checking if the port is accessible and server logs
server_ready = False
for _ in range(30): # Wait up to 30 seconds
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
result = s.connect_ex(("127.0.0.1", HTTP_SERVER_PORT))
if result == 0:
# Also check if we can see the server startup message
server_ready = True
break
except Exception:
pass
await asyncio.sleep(1)
if not server_ready:
# Server didn't start in time
process.terminate()
stdout, stderr = process.communicate()
raise RuntimeError(f"HTTP server failed to start. stdout: {stdout.decode()}, stderr: {stderr.decode()}")
# Give the server a bit more time to fully initialize
await asyncio.sleep(2)
yield process
# Clean up the process
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
@pytest_asyncio.fixture
def http_mcp_provider() -> McpCallTemplate:
"""Provides an McpCallTemplate configured to connect to the mock HTTP server."""
server_config = {
"url": f"http://127.0.0.1:{HTTP_SERVER_PORT}/mcp",
"transport": "http"
}
return McpCallTemplate(
name="mock_http_provider",
call_template_type="mcp",
config=McpConfig(mcpServers={HTTP_SERVER_NAME: server_config})
)
@pytest_asyncio.fixture
async def transport() -> McpCommunicationProtocol:
"""Provides a clean McpCommunicationProtocol instance."""
t = McpCommunicationProtocol()
yield t
@pytest.mark.asyncio
async def test_http_register_manual_discovers_tools(
transport: McpCommunicationProtocol,
http_mcp_provider: McpCallTemplate,
http_server_process: subprocess.Popen
):
"""Test that registering an HTTP MCP manual discovers the correct tools."""
register_result = await transport.register_manual(None, http_mcp_provider)
assert register_result.success
assert len(register_result.manual.tools) == 4
# Find the echo tool
echo_tool = next((tool for tool in register_result.manual.tools if tool.name == f"{HTTP_SERVER_NAME}.echo"), None)
assert echo_tool is not None
assert "echoes back its input" in echo_tool.description
# Check for other tools
tool_names = [tool.name for tool in register_result.manual.tools]
assert f"{HTTP_SERVER_NAME}.greet" in tool_names
assert f"{HTTP_SERVER_NAME}.list_items" in tool_names
assert f"{HTTP_SERVER_NAME}.add_numbers" in tool_names
@pytest.mark.asyncio
async def test_http_structured_output(
transport: McpCommunicationProtocol,
http_mcp_provider: McpCallTemplate,
http_server_process: subprocess.Popen
):
"""Test that HTTP MCP tools with structured output work correctly."""
# Register the provider
await transport.register_manual(None, http_mcp_provider)
# Call the echo tool and verify the result
result = await transport.call_tool(None, f"{HTTP_SERVER_NAME}.echo", {"message": "http_test"}, http_mcp_provider)
assert result == {"reply": "you said: http_test"}
@pytest.mark.asyncio
async def test_http_unstructured_output(
transport: McpCommunicationProtocol,
http_mcp_provider: McpCallTemplate,
http_server_process: subprocess.Popen
):
"""Test that HTTP MCP tools with unstructured output types work correctly."""
# Register the provider
await transport.register_manual(None, http_mcp_provider)
# Call the greet tool and verify the result
result = await transport.call_tool(None, f"{HTTP_SERVER_NAME}.greet", {"name": "Alice"}, http_mcp_provider)
assert result == "Hello, Alice!"
@pytest.mark.asyncio
async def test_http_list_output(
transport: McpCommunicationProtocol,
http_mcp_provider: McpCallTemplate,
http_server_process: subprocess.Popen
):
"""Test that HTTP MCP tools returning lists work correctly."""
# Register the provider
await transport.register_manual(None, http_mcp_provider)
# Call the list_items tool and verify the result
result = await transport.call_tool(None, f"{HTTP_SERVER_NAME}.list_items", {"count": 3}, http_mcp_provider)
assert isinstance(result, list)
assert len(result) == 3
assert result == ["item_0", "item_1", "item_2"]
@pytest.mark.asyncio
async def test_http_numeric_output(
transport: McpCommunicationProtocol,
http_mcp_provider: McpCallTemplate,
http_server_process: subprocess.Popen
):
"""Test that HTTP MCP tools returning numeric values work correctly."""
# Register the provider
await transport.register_manual(None, http_mcp_provider)
# Call the add_numbers tool and verify the result
result = await transport.call_tool(None, f"{HTTP_SERVER_NAME}.add_numbers", {"a": 5, "b": 7}, http_mcp_provider)
assert result == 12
@pytest.mark.asyncio
async def test_http_deregister_manual(
transport: McpCommunicationProtocol,
http_mcp_provider: McpCallTemplate,
http_server_process: subprocess.Popen
):
"""Test that deregistering an HTTP MCP manual works (no-op in session-per-operation mode)."""
# Register a manual
register_result = await transport.register_manual(None, http_mcp_provider)
assert register_result.success
assert len(register_result.manual.tools) == 4
# Deregister it (this is a no-op in session-per-operation mode)
await transport.deregister_manual(None, http_mcp_provider)
# Should still be able to call tools since we create fresh sessions
result = await transport.call_tool(None, f"{HTTP_SERVER_NAME}.echo", {"message": "test"}, http_mcp_provider)
assert result == {"reply": "you said: test"}