Skip to content

Commit e573042

Browse files
committed
Use MCP server in workflow with in-memory transport
1 parent 0c10b57 commit e573042

5 files changed

Lines changed: 83 additions & 169 deletions

File tree

nexusmcp/__init__.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
from temporalio import workflow
1+
import temporalio.workflow
2+
3+
with temporalio.workflow.unsafe.imports_passed_through():
4+
from nexusmcp import workflow
25

3-
with workflow.unsafe.imports_passed_through():
46
from .inbound_gateway import InboundGateway
57
from .service import MCPService
68
from .service_handler import MCPServiceHandler, exclude
7-
from .workflow_transport import WorkflowTransport
89

9-
__all__ = ["MCPService", "MCPServiceHandler", "InboundGateway", "exclude", "WorkflowTransport"]
10+
__all__ = ["MCPService", "MCPServiceHandler", "InboundGateway", "exclude", "workflow"]

nexusmcp/workflow/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .mcp_client import MCPClient
2+
3+
__all__ = ["MCPClient"]

nexusmcp/workflow/mcp_client.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
from contextlib import asynccontextmanager
2+
from typing import Any, AsyncGenerator
3+
4+
import mcp.types as types
5+
from mcp.client.session import ClientSession
6+
from mcp.server.lowlevel import Server
7+
from mcp.shared.memory import create_connected_server_and_client_session
8+
from temporalio import workflow
9+
10+
from nexusmcp.service import MCPService
11+
12+
13+
class MCPClient:
14+
"""
15+
An MCP client for use in Temporal workflows.
16+
17+
This class provides a client that proxies MCP traffic from a Temporal Workflow to a Temporal
18+
Nexus service. It works by running an MCP server in the workflow whose handlers delegate to
19+
nexus operations, and connecting to it via an in-memory transport.
20+
21+
Example:
22+
```python
23+
client = MCPClient("my-endpoint")
24+
async with client.connect() as session:
25+
await session.list_tools()
26+
await session.call_tool("my-service_my-operation", {"arg": "value"})
27+
```
28+
"""
29+
30+
def __init__(
31+
self,
32+
endpoint: str,
33+
):
34+
self.endpoint = endpoint
35+
# Run an in-workflow MCP server whose handlers make nexus calls
36+
self.mcp_server = Server("workflow-gateway-mcp-server")
37+
self.mcp_server.list_tools()(self._handle_list_tools) # type: ignore[no-untyped-call]
38+
self.mcp_server.call_tool()(self._handle_call_tool)
39+
40+
@asynccontextmanager
41+
async def connect(self) -> AsyncGenerator[ClientSession, None]:
42+
"""
43+
Create a connected MCP ClientSession.
44+
45+
The session is automatically initialized before being yielded.
46+
"""
47+
async with create_connected_server_and_client_session(
48+
self.mcp_server,
49+
raise_exceptions=True,
50+
) as session:
51+
yield session
52+
53+
async def _handle_list_tools(self) -> list[types.Tool]:
54+
nexus_client = workflow.create_nexus_client(
55+
endpoint=self.endpoint,
56+
service=MCPService,
57+
)
58+
return await nexus_client.execute_operation(MCPService.list_tools, None)
59+
60+
async def _handle_call_tool(self, name: str, arguments: dict[str, Any]) -> Any:
61+
service, _, operation = name.partition("_")
62+
nexus_client = workflow.create_nexus_client(
63+
endpoint=self.endpoint,
64+
service=service,
65+
)
66+
return await nexus_client.execute_operation(operation, arguments)

nexusmcp/workflow_transport.py

Lines changed: 0 additions & 152 deletions
This file was deleted.

tests/test_workflow_caller.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@
22
from dataclasses import dataclass
33

44
import pytest
5-
from mcp import ClientSession
65
from temporalio import workflow
76
from temporalio.api.nexus.v1 import EndpointSpec, EndpointTarget
87
from temporalio.api.operatorservice.v1 import CreateNexusEndpointRequest
98
from temporalio.contrib.pydantic import pydantic_data_converter
109
from temporalio.testing import WorkflowEnvironment
1110
from temporalio.worker import Worker
1211

13-
from nexusmcp import WorkflowTransport
12+
import nexusmcp.workflow
1413

1514
from .service import TestServiceHandler, mcp_service
1615

@@ -26,18 +25,15 @@ class MCPCallerWorkflowInput:
2625
class MCPCallerWorkflow:
2726
@workflow.run
2827
async def run(self, input: MCPCallerWorkflowInput) -> None:
29-
transport = WorkflowTransport(input.endpoint)
30-
async with transport.connect() as (read_stream, write_stream):
31-
async with ClientSession(read_stream, write_stream) as session:
32-
await session.initialize()
28+
async with nexusmcp.workflow.MCPClient(input.endpoint).connect() as session:
29+
# Session is already initialized
30+
list_tools_result = await session.list_tools()
31+
assert len(list_tools_result.tools) == 2
32+
assert list_tools_result.tools[0].name == "modified-service-name_modified-op-name"
33+
assert list_tools_result.tools[1].name == "modified-service-name_op2"
3334

34-
list_tools_result = await session.list_tools()
35-
assert len(list_tools_result.tools) == 2
36-
assert list_tools_result.tools[0].name == "modified-service-name_modified-op-name"
37-
assert list_tools_result.tools[1].name == "modified-service-name_op2"
38-
39-
call_result = await session.call_tool("modified-service-name_modified-op-name", {"name": "World"})
40-
assert call_result.structuredContent == {"message": "Hello, World"}
35+
call_result = await session.call_tool("modified-service-name_modified-op-name", {"name": "World"})
36+
assert call_result.structuredContent == {"message": "Hello, World"}
4137

4238

4339
@pytest.mark.asyncio

0 commit comments

Comments
 (0)