-
-
Notifications
You must be signed in to change notification settings - Fork 114
Add Streamable HTTP protocol support and support for self-signed certs #122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -39,12 +39,21 @@ class Sampling(BaseModel): | |||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| class SSEMCPServer(BaseModel): | ||||||||||||||||||||||||||||||||||||||||||||||||||
| # TODO: expand this once I find a good definition for this | ||||||||||||||||||||||||||||||||||||||||||||||||||
| url: str = Field(description="URL of the MCP server") | ||||||||||||||||||||||||||||||||||||||||||||||||||
| headers: dict[str, str] = Field(default_factory=dict, description="HTTP headers to send with requests") | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ssl_verify: bool | str = Field(default=True, description="SSL verification: True, False, or path to a CA bundle file") | ||||||||||||||||||||||||||||||||||||||||||||||||||
| transport: Literal["sse"] = Field(default="sse") | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| class StreamableHTTPMCPServer(BaseModel): | ||||||||||||||||||||||||||||||||||||||||||||||||||
| url: str = Field(description="URL of the MCP server") | ||||||||||||||||||||||||||||||||||||||||||||||||||
| headers: dict[str, str] = Field(default_factory=dict, description="HTTP headers to send with requests") | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+43
to
+50
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| headers: dict[str, str] = Field(default_factory=dict, description="HTTP headers to send with requests") | |
| ssl_verify: bool | str = Field(default=True, description="SSL verification: True, False, or path to a CA bundle file") | |
| transport: Literal["sse"] = Field(default="sse") | |
| class StreamableHTTPMCPServer(BaseModel): | |
| url: str = Field(description="URL of the MCP server") | |
| headers: dict[str, str] = Field(default_factory=dict, description="HTTP headers to send with requests") | |
| headers: dict[str, str] = Field( | |
| default_factory=dict, | |
| description="HTTP headers to send with requests", | |
| repr=False, | |
| ) | |
| ssl_verify: bool | str = Field(default=True, description="SSL verification: True, False, or path to a CA bundle file") | |
| transport: Literal["sse"] = Field(default="sse") | |
| class StreamableHTTPMCPServer(BaseModel): | |
| url: str = Field(description="URL of the MCP server") | |
| headers: dict[str, str] = Field( | |
| default_factory=dict, | |
| description="HTTP headers to send with requests", | |
| repr=False, | |
| ) |
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,4 +1,5 @@ | ||||||||||||
| import asyncio | ||||||||||||
| import httpx | ||||||||||||
| from mcp.client.sse import sse_client | ||||||||||||
| from mcp_bridge.config import config | ||||||||||||
| from mcp_bridge.config.final import SSEMCPServer | ||||||||||||
|
|
@@ -16,22 +17,41 @@ def __init__(self, name: str, config: SSEMCPServer) -> None: | |||||||||||
| self.config = config | ||||||||||||
|
|
||||||||||||
| async def _maintain_session(self): | ||||||||||||
| async with sse_client(self.config.url) as client: | ||||||||||||
| async with McpClientSession(*client) as session: | ||||||||||||
| await session.initialize() | ||||||||||||
| logger.debug(f"finished initialise session for {self.name}") | ||||||||||||
| self.session = session | ||||||||||||
|
|
||||||||||||
| try: | ||||||||||||
| while True: | ||||||||||||
| await asyncio.sleep(10) | ||||||||||||
| if config.logging.log_server_pings: | ||||||||||||
| logger.debug(f"pinging session for {self.name}") | ||||||||||||
|
|
||||||||||||
| await session.send_ping() | ||||||||||||
|
|
||||||||||||
| except Exception as exc: | ||||||||||||
| logger.error(f"ping failed for {self.name}: {exc}") | ||||||||||||
| self.session = None | ||||||||||||
| ssl_verify = self.config.ssl_verify | ||||||||||||
|
|
||||||||||||
| if ssl_verify is not True: | ||||||||||||
| # sse_client doesn't expose ssl options, so briefly patch httpx.AsyncClient | ||||||||||||
| # to inject the verify setting before it creates its internal client. | ||||||||||||
| _original_init = httpx.AsyncClient.__init__ | ||||||||||||
|
|
||||||||||||
| def _patched_init(client_self, *args, **kwargs): | ||||||||||||
| kwargs.setdefault("verify", ssl_verify) | ||||||||||||
| _original_init(client_self, *args, **kwargs) | ||||||||||||
|
|
||||||||||||
| httpx.AsyncClient.__init__ = _patched_init | ||||||||||||
|
|
||||||||||||
|
Comment on lines
+20
to
+32
|
||||||||||||
| try: | ||||||||||||
| async with sse_client( | ||||||||||||
| self.config.url, headers=self.config.headers or None | ||||||||||||
| ) as client: | ||||||||||||
| async with McpClientSession(*client) as session: | ||||||||||||
| await session.initialize() | ||||||||||||
| logger.debug(f"finished initialise session for {self.name}") | ||||||||||||
| self.session = session | ||||||||||||
|
|
||||||||||||
| try: | ||||||||||||
| while True: | ||||||||||||
| await asyncio.sleep(10) | ||||||||||||
| if config.logging.log_server_pings: | ||||||||||||
| logger.debug(f"pinging session for {self.name}") | ||||||||||||
|
|
||||||||||||
| await session.send_ping() | ||||||||||||
|
|
||||||||||||
|
||||||||||||
| except asyncio.CancelledError: | |
| self.session = None | |
| raise |
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,56 @@ | ||||||||||
| import asyncio | ||||||||||
| import httpx | ||||||||||
| from mcp.client.streamable_http import streamablehttp_client | ||||||||||
| from mcp_bridge.config import config | ||||||||||
| from mcp_bridge.config.final import StreamableHTTPMCPServer | ||||||||||
| from mcp_bridge.mcp_clients.session import McpClientSession | ||||||||||
| from .AbstractClient import GenericMcpClient | ||||||||||
| from loguru import logger | ||||||||||
|
|
||||||||||
|
|
||||||||||
| class StreamableHttpClient(GenericMcpClient): | ||||||||||
| config: StreamableHTTPMCPServer | ||||||||||
|
|
||||||||||
| def __init__(self, name: str, config: StreamableHTTPMCPServer) -> None: | ||||||||||
| super().__init__(name=name) | ||||||||||
| self.config = config | ||||||||||
|
|
||||||||||
| async def _maintain_session(self): | ||||||||||
| ssl_verify = self.config.ssl_verify | ||||||||||
|
|
||||||||||
| if ssl_verify is not True: | ||||||||||
| # streamablehttp_client doesn't expose ssl options, so briefly patch | ||||||||||
| # httpx.AsyncClient to inject the verify setting. | ||||||||||
| _original_init = httpx.AsyncClient.__init__ | ||||||||||
| def _patched_init(client_self, *args, **kwargs): | ||||||||||
| kwargs.setdefault("verify", ssl_verify) | ||||||||||
| _original_init(client_self, *args, **kwargs) | ||||||||||
| httpx.AsyncClient.__init__ = _patched_init | ||||||||||
|
Comment on lines
+19
to
+28
|
||||||||||
|
|
||||||||||
| try: | ||||||||||
| async with streamablehttp_client( | ||||||||||
| self.config.url, | ||||||||||
| headers=self.config.headers or None, | ||||||||||
| ) as streams: | ||||||||||
| # SDK may yield (read, write) or (read, write, session_id_cb) | ||||||||||
| read_stream, write_stream = streams[0], streams[1] | ||||||||||
| async with McpClientSession(read_stream, write_stream) as session: | ||||||||||
| await session.initialize() | ||||||||||
| logger.debug(f"finished initialise session for {self.name}") | ||||||||||
| self.session = session | ||||||||||
|
|
||||||||||
| try: | ||||||||||
| while True: | ||||||||||
| await asyncio.sleep(10) | ||||||||||
| if config.logging.log_server_pings: | ||||||||||
| logger.debug(f"pinging session for {self.name}") | ||||||||||
| await session.send_ping() | ||||||||||
|
|
||||||||||
|
||||||||||
| except asyncio.CancelledError: | |
| raise |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,7 +11,7 @@ dependencies = [ | |
| "httpx-sse>=0.4.0", | ||
| "lmos-openai-types", | ||
| "loguru>=0.7.3", | ||
| "mcp>=1.2.0,<=1.7.1", | ||
| "mcp>=1.8.0,<=1.9.3", | ||
| "mcpx[docker]>=0.1.1", | ||
|
Comment on lines
11
to
15
|
||
| "opentelemetry-api>=1.33.1", | ||
| "opentelemetry-exporter-otlp>=1.33.1", | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This section says MCP-Bridge supports “three transport types”, but the code/config also supports Docker-based MCP servers (
DockerMCPServer). Either include Docker in this list (and document its config shape) or clarify that this list is only for non-Docker transports, to avoid misleading users.