|
| 1 | +from contextlib import asynccontextmanager |
| 2 | +from typing import Any, AsyncGenerator |
| 3 | + |
| 4 | +import mcp.types as types |
| 5 | +from mcp.client.session import ClientSession |
| 6 | +from mcp.server.lowlevel import Server |
| 7 | +from mcp.shared.memory import create_connected_server_and_client_session |
| 8 | +from temporalio import workflow |
| 9 | + |
| 10 | +from nexusmcp.service import MCPService |
| 11 | + |
| 12 | + |
| 13 | +class MCPClient: |
| 14 | + """ |
| 15 | + An MCP client for use in Temporal workflows. |
| 16 | +
|
| 17 | + This class provides a client that proxies MCP traffic from a Temporal Workflow to a Temporal |
| 18 | + Nexus service. It works by running an MCP server in the workflow whose handlers delegate to |
| 19 | + nexus operations, and connecting to it via an in-memory transport. |
| 20 | +
|
| 21 | + Example: |
| 22 | + ```python |
| 23 | + client = MCPClient("my-endpoint") |
| 24 | + async with client.connect() as session: |
| 25 | + await session.list_tools() |
| 26 | + await session.call_tool("my-service_my-operation", {"arg": "value"}) |
| 27 | + ``` |
| 28 | + """ |
| 29 | + |
| 30 | + def __init__( |
| 31 | + self, |
| 32 | + endpoint: str, |
| 33 | + ): |
| 34 | + self.endpoint = endpoint |
| 35 | + # Run an in-workflow MCP server whose handlers make nexus calls |
| 36 | + self.mcp_server = Server("workflow-gateway-mcp-server") |
| 37 | + self.mcp_server.list_tools()(self._handle_list_tools) # type: ignore[no-untyped-call] |
| 38 | + self.mcp_server.call_tool()(self._handle_call_tool) |
| 39 | + |
| 40 | + @asynccontextmanager |
| 41 | + async def connect(self) -> AsyncGenerator[ClientSession, None]: |
| 42 | + """ |
| 43 | + Create a connected MCP ClientSession. |
| 44 | +
|
| 45 | + The session is automatically initialized before being yielded. |
| 46 | + """ |
| 47 | + async with create_connected_server_and_client_session( |
| 48 | + self.mcp_server, |
| 49 | + raise_exceptions=True, |
| 50 | + ) as session: |
| 51 | + yield session |
| 52 | + |
| 53 | + async def _handle_list_tools(self) -> list[types.Tool]: |
| 54 | + nexus_client = workflow.create_nexus_client( |
| 55 | + endpoint=self.endpoint, |
| 56 | + service=MCPService, |
| 57 | + ) |
| 58 | + return await nexus_client.execute_operation(MCPService.list_tools, None) |
| 59 | + |
| 60 | + async def _handle_call_tool(self, name: str, arguments: dict[str, Any]) -> Any: |
| 61 | + service, _, operation = name.partition("_") |
| 62 | + nexus_client = workflow.create_nexus_client( |
| 63 | + endpoint=self.endpoint, |
| 64 | + service=service, |
| 65 | + ) |
| 66 | + return await nexus_client.execute_operation(operation, arguments) |
0 commit comments