Skip to content

Commit 6577e50

Browse files
authored
feat(mcp): add complete MCP support with remote registry integration (#544)
* feat: add MCP notification types for server lifecycle events * feat: add MCP remote registry client with TTL caching Implement McpRegistryClient for fetching MCP servers from registry.cortex.foundation/mcp with local TTL-based caching. Features: - In-memory cache with configurable TTL (default 1 hour) - Persistent disk cache for offline resilience - Search by name, description, tags, or category - Fallback servers when registry is unavailable - Configurable registry URL and cache directory * feat: integrate MCP lifecycle notifications with connection manager * feat: update TUI MCP Manager Modal to support remote registry - Add RegistryEntry and RegistrySource types in registry.rs - Add From<RegistryServer> impl to convert remote entries - Add get_local_registry_entries() and get_remote_server_config() - Add RegistryLoadState enum in types.rs - Update SelectFromRegistry mode to include entries and load_state - Update handlers to work with new RegistryEntry structure - Update rendering to display category and required env indicators - Re-export remote registry types for future async loading
1 parent 2cb9f0c commit 6577e50

File tree

10 files changed

+961
-37
lines changed

10 files changed

+961
-37
lines changed

src/cortex-engine/src/mcp/manager.rs

Lines changed: 111 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,29 @@ use std::sync::Arc;
1111

1212
use anyhow::{Result, anyhow};
1313
use serde_json::Value;
14-
use tokio::sync::{Mutex, RwLock};
14+
use tokio::sync::{Mutex, RwLock, mpsc};
1515
use tracing::warn;
1616

1717
use cortex_mcp_types::{CallToolResult, Resource, Tool};
1818

1919
use super::McpServerConfig;
2020
use super::client::{ConnectionState, McpClient};
2121

22+
/// MCP lifecycle event for notifications
23+
#[derive(Debug, Clone)]
24+
pub enum McpLifecycleEvent {
25+
/// Server was added to the manager
26+
ServerAdded { name: String },
27+
/// Server was connected and tools discovered
28+
ServerConnected { name: String, tool_count: usize, tool_names: Vec<String> },
29+
/// Server was disconnected
30+
ServerDisconnected { name: String },
31+
/// Server was removed from the manager
32+
ServerRemoved { name: String },
33+
/// Connection failed
34+
ConnectionFailed { name: String, error: String },
35+
}
36+
2237
/// Delimiter used to create qualified tool names.
2338
const TOOL_NAME_DELIMITER: &str = "__";
2439

@@ -33,6 +48,8 @@ pub struct McpConnectionManager {
3348
configs: RwLock<HashMap<String, McpServerConfig>>,
3449
/// Set of servers currently being started (prevents race conditions).
3550
starting_servers: Mutex<HashSet<String>>,
51+
/// Event sender for lifecycle notifications
52+
event_tx: Option<mpsc::UnboundedSender<McpLifecycleEvent>>,
3653
}
3754

3855
impl Default for McpConnectionManager {
@@ -48,6 +65,29 @@ impl McpConnectionManager {
4865
clients: RwLock::new(HashMap::new()),
4966
configs: RwLock::new(HashMap::new()),
5067
starting_servers: Mutex::new(HashSet::new()),
68+
event_tx: None,
69+
}
70+
}
71+
72+
/// Create a new connection manager with an event sender
73+
pub fn with_event_sender(tx: mpsc::UnboundedSender<McpLifecycleEvent>) -> Self {
74+
Self {
75+
clients: RwLock::new(HashMap::new()),
76+
configs: RwLock::new(HashMap::new()),
77+
starting_servers: Mutex::new(HashSet::new()),
78+
event_tx: Some(tx),
79+
}
80+
}
81+
82+
/// Set the event sender for lifecycle notifications
83+
pub fn set_event_sender(&mut self, tx: mpsc::UnboundedSender<McpLifecycleEvent>) {
84+
self.event_tx = Some(tx);
85+
}
86+
87+
/// Helper to send an event
88+
fn send_event(&self, event: McpLifecycleEvent) {
89+
if let Some(ref tx) = self.event_tx {
90+
let _ = tx.send(event);
5191
}
5292
}
5393

@@ -61,11 +101,15 @@ impl McpConnectionManager {
61101

62102
// Create client but don't connect yet
63103
let client = Arc::new(McpClient::new(config));
64-
self.clients.write().await.insert(name, client);
104+
self.clients.write().await.insert(name.clone(), client);
105+
106+
self.send_event(McpLifecycleEvent::ServerAdded { name });
65107
}
66108

67109
/// Remove a server.
68110
pub async fn remove_server(&self, name: &str) -> Result<()> {
111+
self.send_event(McpLifecycleEvent::ServerRemoved { name: name.to_string() });
112+
69113
// Disconnect if connected
70114
if let Some(client) = self.clients.write().await.remove(name) {
71115
client.disconnect().await?;
@@ -150,6 +194,28 @@ impl McpConnectionManager {
150194
starting.remove(name);
151195
}
152196

197+
// Emit lifecycle event based on connection result
198+
match &result {
199+
Ok(()) => {
200+
// After successful connect, get tools and emit event
201+
if let Some(client) = self.clients.read().await.get(name).cloned() {
202+
let tools = client.tools().await;
203+
let tool_names: Vec<String> = tools.iter().map(|t| t.name.clone()).collect();
204+
self.send_event(McpLifecycleEvent::ServerConnected {
205+
name: name.to_string(),
206+
tool_count: tools.len(),
207+
tool_names,
208+
});
209+
}
210+
}
211+
Err(e) => {
212+
self.send_event(McpLifecycleEvent::ConnectionFailed {
213+
name: name.to_string(),
214+
error: e.to_string(),
215+
});
216+
}
217+
}
218+
153219
result
154220
}
155221

@@ -163,7 +229,13 @@ impl McpConnectionManager {
163229
.cloned()
164230
.ok_or_else(|| anyhow!("Server not found: {}", name))?;
165231

166-
client.disconnect().await
232+
let result = client.disconnect().await;
233+
234+
if result.is_ok() {
235+
self.send_event(McpLifecycleEvent::ServerDisconnected { name: name.to_string() });
236+
}
237+
238+
result
167239
}
168240

169241
/// Connect to all servers.
@@ -402,4 +474,40 @@ mod tests {
402474
manager.remove_server("test").await.unwrap();
403475
assert!(manager.server_names().await.is_empty());
404476
}
477+
478+
#[tokio::test]
479+
async fn test_lifecycle_events() {
480+
let (tx, mut rx) = mpsc::unbounded_channel();
481+
let manager = McpConnectionManager::with_event_sender(tx);
482+
483+
let config = McpServerConfig::new("test", "echo");
484+
manager.add_server(config).await;
485+
486+
// Should receive ServerAdded event
487+
let event = rx.try_recv();
488+
assert!(matches!(event, Ok(McpLifecycleEvent::ServerAdded { name }) if name == "test"));
489+
490+
manager.remove_server("test").await.unwrap();
491+
492+
// Should receive ServerRemoved event
493+
let event = rx.try_recv();
494+
assert!(matches!(event, Ok(McpLifecycleEvent::ServerRemoved { name }) if name == "test"));
495+
}
496+
497+
#[tokio::test]
498+
async fn test_manager_with_event_sender() {
499+
let (tx, _rx) = mpsc::unbounded_channel();
500+
let manager = McpConnectionManager::with_event_sender(tx);
501+
assert!(manager.event_tx.is_some());
502+
}
503+
504+
#[tokio::test]
505+
async fn test_manager_set_event_sender() {
506+
let mut manager = McpConnectionManager::new();
507+
assert!(manager.event_tx.is_none());
508+
509+
let (tx, _rx) = mpsc::unbounded_channel();
510+
manager.set_event_sender(tx);
511+
assert!(manager.event_tx.is_some());
512+
}
405513
}

src/cortex-engine/src/mcp/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub mod client;
1010
pub mod manager;
1111
pub mod oauth;
1212
pub mod oauth_callback;
13+
pub mod registry;
1314

1415
// OAuth exports - these are actively used by cortex-cli
1516
pub use oauth::{
@@ -25,6 +26,12 @@ pub use oauth_callback::{
2526
pub use client::{ConnectionState, McpClient};
2627
pub use manager::{McpConnectionManager, create_qualified_name, parse_qualified_name};
2728

29+
// Registry exports
30+
pub use registry::{
31+
McpRegistryClient, RegistryServer, RegistryInstallConfig, StdioConfig, HttpConfig,
32+
REGISTRY_URL, DEFAULT_CACHE_TTL,
33+
};
34+
2835
use std::collections::HashMap;
2936
use std::path::PathBuf;
3037
use std::time::Duration;

0 commit comments

Comments
 (0)