-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrun_mcp.py
More file actions
50 lines (40 loc) · 1.51 KB
/
run_mcp.py
File metadata and controls
50 lines (40 loc) · 1.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import argparse
import asyncio
from openmanus_max.agent.manus import ManusAgent
from openmanus_max.core.logger import get_logger
from openmanus_max.tool.mcp.client import MCPClient
logger = get_logger("openmanus_max")
async def run_mcp():
parser = argparse.ArgumentParser(description="Run OpenManus-Max with MCP server connection")
parser.add_argument(
"--server",
type=str,
required=True,
help="MCP server command (e.g., 'npx -y @modelcontextprotocol/server-filesystem /tmp')",
)
parser.add_argument(
"--prompt", type=str, required=False, help="Input prompt for the agent"
)
args = parser.parse_args()
agent = ManusAgent()
try:
# Connect to MCP server
client = MCPClient(server_command=args.server)
logger.info(f"Connecting to MCP server: {args.server}")
tools = await client.connect()
logger.info(f"Discovered {len(tools)} tools from MCP server")
prompt = args.prompt if args.prompt else input("Enter your prompt: ")
if not prompt.strip():
logger.warning("Empty prompt provided.")
return
logger.info("Processing your request...")
result = await agent.run(prompt)
logger.info("Request processing completed.")
if result:
print(result)
except KeyboardInterrupt:
logger.warning("Operation interrupted.")
except Exception as e:
logger.error(f"Error: {str(e)}")
if __name__ == "__main__":
asyncio.run(run_mcp())