diff --git a/crates/openfang-api/src/bridge_ipc.rs b/crates/openfang-api/src/bridge_ipc.rs index 54ba9d25c..19ec85c88 100644 --- a/crates/openfang-api/src/bridge_ipc.rs +++ b/crates/openfang-api/src/bridge_ipc.rs @@ -40,9 +40,11 @@ use crate::bridge_auth::BridgeAuthority; use openfang_kernel::OpenFangKernel; use openfang_mcp_bridge::protocol::{ - codec, CallRequest, CallResponse, CallResult, Frame, Hello, HelloAck, PROTOCOL_VERSION, + codec, CallRequest, CallResponse, CallResult, Frame, Hello, HelloAck, ListUpstreamRequest, + UpstreamListResponse, UpstreamListResult, UpstreamToolDef, MAX_FRAME_BYTES, PROTOCOL_VERSION, SOCKET_RELATIVE_PATH, }; +use openfang_runtime::mcp::{extract_mcp_server_from_known, is_mcp_tool}; use openfang_types::agent::AgentId; use openfang_types::bridge_auth::Token; use std::path::{Path, PathBuf}; @@ -322,40 +324,60 @@ async fn handle_connection( Err(e) => return Err(e), }; - let call = match frame { - Frame::Call(c) => c, + match frame { + Frame::Call(call) => { + info!( + request_id = call.request_id, + tool = %call.tool_name, + agent = %call.agent_id, + "bridge IPC: dispatching call" + ); + + let result = dispatch_call(&call, &kernel, identity.agent_id.as_ref()).await; + let result_kind = match &result { + CallResult::Ok { + is_error: false, .. + } => "ok", + CallResult::Ok { is_error: true, .. } => "tool_error", + CallResult::Error { .. } => "dispatch_error", + }; + info!( + request_id = call.request_id, + tool = %call.tool_name, + outcome = result_kind, + "bridge IPC: call complete" + ); + let response = Frame::Response(CallResponse { + request_id: call.request_id, + result, + }); + codec::write_frame(&mut write_half, &response).await?; + } + Frame::ListUpstream(req) => { + info!( + request_id = req.request_id, + "bridge IPC: dispatching list_upstream" + ); + let response = + handle_list_upstream(&req, &kernel, identity.agent_id.as_ref()).await; + let outcome = match &response.result { + UpstreamListResult::Ok { tools } => { + format!("ok({} tools)", tools.len()) + } + UpstreamListResult::Error { .. } => "error".to_string(), + }; + info!( + request_id = response.request_id, + outcome = %outcome, + "bridge IPC: list_upstream complete" + ); + codec::write_frame(&mut write_half, &Frame::UpstreamList(response)).await?; + } other => { warn!(?other, "bridge IPC: unexpected frame in request loop"); continue; } - }; - - info!( - request_id = call.request_id, - tool = %call.tool_name, - agent = %call.agent_id, - "bridge IPC: dispatching call" - ); - - let result = dispatch_call(&call, &kernel, identity.agent_id.as_ref()).await; - let result_kind = match &result { - CallResult::Ok { - is_error: false, .. - } => "ok", - CallResult::Ok { is_error: true, .. } => "tool_error", - CallResult::Error { .. } => "dispatch_error", - }; - info!( - request_id = call.request_id, - tool = %call.tool_name, - outcome = result_kind, - "bridge IPC: call complete" - ); - let response = Frame::Response(CallResponse { - request_id: call.request_id, - result, - }); - codec::write_frame(&mut write_half, &response).await?; + } } } @@ -378,6 +400,16 @@ async fn dispatch_call( kernel: &Arc, authenticated_agent_id: Option<&AgentId>, ) -> CallResult { + // --- Early branch: upstream MCP tool (mcp_*) --------------------------- + // Upstream MCP tools are not in `ALLOWED_TOOLS` and have a separate + // dispatch path: per-agent server allowlist (agent.toml `mcp_servers`, + // default-deny on empty), hardened-token lane only, and direct dispatch + // into `kernel.mcp_connections` rather than `execute_tool`. See + // `dispatch_upstream_mcp_call` for the gates. + if is_mcp_tool(&call.tool_name) { + return dispatch_upstream_mcp_call(call, kernel, authenticated_agent_id).await; + } + // --- Gate 1: static bridge-surface allowlist ---------------------------- // The hard ceiling on what the bridge will ever dispatch. Independent of // any agent's per-agent surface; an unknown tool never reaches identity @@ -589,6 +621,320 @@ async fn dispatch_call( } } +/// Dispatch an upstream MCP tool call (`mcp_{server}_{tool}`) over the +/// kernel's `mcp_connections` registry. +/// +/// Distinct from [`dispatch_call`]'s built-in tool path: +/// +/// - **Hardened-token lane only.** Refuses the legacy +/// self-claimed-`agent_id` lane. Upstream MCP servers can carry +/// secrets (OAuth tokens, page contents, Linear issue bodies) and +/// the legacy lane has no daemon-issued identity to authorize +/// against — fail closed. The hardened lane is the only entry to +/// this surface for v1. +/// +/// - **Per-agent server allowlist with default-deny.** Reads +/// `entry.manifest.mcp_servers` from the registry and rejects any +/// tool whose server prefix is not on the list. **Empty list means +/// no servers allowed** for the bridge path — this is a deliberate +/// semantic departure from the in-process MCP path +/// (`tool_runner.rs`), which historically treated `[]` as +/// "all servers". v1 ships the new semantic for the bridge only; +/// convergence is tracked as follow-up. See design doc §5.4. +/// +/// - **Direct dispatch into `kernel.mcp_connections`.** Bypasses +/// `execute_tool` entirely; the runtime's MCP routing already does +/// what we need, and routing through `execute_tool` would require +/// adding every namespaced tool to its allowlist parameter. +/// +/// Truncation: results larger than the response frame budget are +/// returned with `is_error=true` and an explicit truncation marker +/// rather than silently dropped. Per design doc §6 mitigation table. +async fn dispatch_upstream_mcp_call( + call: &CallRequest, + kernel: &Arc, + authenticated_agent_id: Option<&AgentId>, +) -> CallResult { + // Refuse the legacy lane outright. Upstream MCP forwarding is + // hardened-path-only in v1. + let authed = match authenticated_agent_id { + Some(a) => a, + None => { + warn!( + request_id = call.request_id, + tool = %call.tool_name, + claimed = %call.agent_id, + "bridge IPC: refusing upstream MCP call on legacy token lane" + ); + return CallResult::Error { + message: "upstream MCP tools require a daemon-issued (hex) auth token; legacy lane refused" + .to_string(), + }; + } + }; + + let resolved_agent_id = *authed; + let resolved_agent_id_string = resolved_agent_id.to_string(); + + // Log a mismatch but trust the authenticated identity, consistent + // with the built-in path in `dispatch_call`. + if resolved_agent_id_string != call.agent_id { + warn!( + request_id = call.request_id, + tool = %call.tool_name, + claimed = %call.agent_id, + authenticated = %resolved_agent_id_string, + "bridge IPC (upstream MCP): claimed agent_id disagrees with authenticated identity; using authenticated identity" + ); + } + + let entry = match kernel.registry.get(resolved_agent_id) { + Some(e) => e, + None => { + warn!( + request_id = call.request_id, + tool = %call.tool_name, + agent = %resolved_agent_id_string, + "bridge IPC (upstream MCP): no registry entry for resolved agent" + ); + return CallResult::Error { + message: format!( + "agent '{resolved_agent_id_string}' has no registry entry; refusing upstream MCP call" + ), + }; + } + }; + + // Default-deny: empty `mcp_servers` → no upstream tools allowed. + // This is the bridge-path semantic; the in-process path's + // `[] = all` convention is left undisturbed for now (see design + // doc §5.4). + if entry.manifest.mcp_servers.is_empty() { + warn!( + request_id = call.request_id, + tool = %call.tool_name, + agent = %resolved_agent_id_string, + "bridge IPC (upstream MCP): agent has no mcp_servers allowlist; refusing" + ); + return CallResult::Error { + message: format!( + "agent '{resolved_agent_id_string}' has no MCP servers allowlisted (set `mcp_servers` in agent.toml)" + ), + }; + } + + // Match the tool's server prefix against the allowlist. Use the + // `from_known` helper so server names containing hyphens + // (normalized to underscores in tool names) match correctly. + let allowlist: Vec<&str> = entry + .manifest + .mcp_servers + .iter() + .map(|s| s.as_str()) + .collect(); + let server_name = match extract_mcp_server_from_known(&call.tool_name, &allowlist) { + Some(name) => name.to_string(), + None => { + warn!( + request_id = call.request_id, + tool = %call.tool_name, + agent = %resolved_agent_id_string, + allowlist = ?entry.manifest.mcp_servers, + "bridge IPC (upstream MCP): tool's server prefix not in agent allowlist" + ); + return CallResult::Error { + message: format!( + "upstream MCP tool '{}' is not allowlisted for agent '{}' (allowed servers: {:?})", + call.tool_name, resolved_agent_id_string, entry.manifest.mcp_servers + ), + }; + } + }; + + // Dispatch into the kernel's MCP registry. Hold the lock just + // long enough to issue the call; rmcp's `call_tool` awaits a + // network round-trip, so this *does* serialize concurrent calls + // against the same connection set. That matches the in-process + // path in `tool_runner.rs` and is acceptable for v1 — Linear / + // Notion calls are not hot-path. Revisit when latency complaints + // arrive. + let result_text = { + let mut conns = kernel.mcp_connections.lock().await; + let conn = conns.iter_mut().find(|c| c.name() == server_name); + let conn = match conn { + Some(c) => c, + None => { + warn!( + request_id = call.request_id, + tool = %call.tool_name, + agent = %resolved_agent_id_string, + server = %server_name, + "bridge IPC (upstream MCP): server allowlisted but not connected" + ); + return CallResult::Error { + message: format!( + "MCP server '{server_name}' is allowlisted for agent '{resolved_agent_id_string}' but not currently connected" + ), + }; + } + }; + conn.call_tool(&call.tool_name, &call.args).await + }; + + match result_text { + Ok(content) => { + // Truncation: keep the framed response well under + // MAX_FRAME_BYTES so the JSON envelope (request_id, + // result tag, is_error, escapes) fits comfortably. + // Margin is conservative on purpose. + const CONTENT_BUDGET: usize = MAX_FRAME_BYTES.saturating_sub(16 * 1024); + if content.len() > CONTENT_BUDGET { + warn!( + request_id = call.request_id, + tool = %call.tool_name, + agent = %resolved_agent_id_string, + bytes = content.len(), + budget = CONTENT_BUDGET, + "bridge IPC (upstream MCP): truncating oversized result" + ); + // UTF-8 invariant: each `char` encodes to ≤ 4 bytes, so + // `chars().take(N)` produces a `String` of ≤ 4N bytes. + // Taking `CONTENT_BUDGET / 4` chars therefore yields a + // string of ≤ CONTENT_BUDGET bytes, fitting the frame. + let mut truncated: String = content.chars().take(CONTENT_BUDGET / 4).collect(); + truncated.push_str( + " + +[openfang: upstream MCP result truncated — exceeded bridge frame budget]", + ); + CallResult::Ok { + content: truncated, + is_error: true, + } + } else { + CallResult::Ok { + content, + is_error: false, + } + } + } + Err(e) => { + // Distinguish runtime tool errors from dispatch failures + // the same way the built-in path does: a tool that ran + // and reported error is `Ok { is_error: true }`; we + // can't easily tell the difference from rmcp's surface + // here, so we surface as `Ok { is_error: true }` to let + // the model see the message rather than killing the + // call frame. + warn!( + request_id = call.request_id, + tool = %call.tool_name, + agent = %resolved_agent_id_string, + error = %e, + "bridge IPC (upstream MCP): tool call failed" + ); + CallResult::Ok { + content: format!("upstream MCP tool call failed: {e}"), + is_error: true, + } + } + } +} + +/// Handle a `ListUpstream` request: enumerate the upstream MCP tools the +/// authenticated agent is allowed to invoke. +/// +/// Same security model as [`dispatch_upstream_mcp_call`]: +/// - Hardened-token lane only. +/// - Per-agent `mcp_servers` allowlist with default-deny. +/// - Empty allowlist → empty tool list (not an error; the bridge will +/// simply advertise no upstream tools to Claude Code). +async fn handle_list_upstream( + req: &ListUpstreamRequest, + kernel: &Arc, + authenticated_agent_id: Option<&AgentId>, +) -> UpstreamListResponse { + let authed = match authenticated_agent_id { + Some(a) => a, + None => { + warn!( + request_id = req.request_id, + "bridge IPC: refusing list_upstream on legacy token lane" + ); + return UpstreamListResponse { + request_id: req.request_id, + result: UpstreamListResult::Error { + message: "upstream MCP listing requires a daemon-issued (hex) auth token; legacy lane refused" + .to_string(), + }, + }; + } + }; + + let resolved_agent_id = *authed; + let resolved_agent_id_string = resolved_agent_id.to_string(); + + let entry = match kernel.registry.get(resolved_agent_id) { + Some(e) => e, + None => { + warn!( + request_id = req.request_id, + agent = %resolved_agent_id_string, + "bridge IPC: list_upstream — no registry entry for resolved agent" + ); + return UpstreamListResponse { + request_id: req.request_id, + result: UpstreamListResult::Error { + message: format!("agent '{resolved_agent_id_string}' has no registry entry"), + }, + }; + } + }; + + // Empty allowlist → empty list (advertise nothing). This is the + // natural representation of `mcp_servers = []` with the new + // default-deny semantic. + if entry.manifest.mcp_servers.is_empty() { + return UpstreamListResponse { + request_id: req.request_id, + result: UpstreamListResult::Ok { tools: Vec::new() }, + }; + } + + let allowlist: std::collections::HashSet<&str> = entry + .manifest + .mcp_servers + .iter() + .map(|s| s.as_str()) + .collect(); + + let conns = kernel.mcp_connections.lock().await; + let mut tools: Vec = Vec::new(); + for conn in conns.iter() { + let server_name = conn.name(); + if !allowlist.contains(server_name) { + continue; + } + for tool in conn.tools() { + tools.push(UpstreamToolDef { + name: tool.name.clone(), + server: server_name.to_string(), + description: if tool.description.is_empty() { + None + } else { + Some(tool.description.clone()) + }, + input_schema: tool.input_schema.clone(), + }); + } + } + + UpstreamListResponse { + request_id: req.request_id, + result: UpstreamListResult::Ok { tools }, + } +} + /// Authenticate the bridge's Hello against the daemon's [`BridgeAuthority`]. /// /// Decision tree: @@ -809,37 +1155,216 @@ mod tests { Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(()), Err(e) => return Err(e), }; - let call = match frame { - Frame::Call(c) => c, - _ => continue, - }; - - // Mirror production allowlist logic. - let result = if !ALLOWED_TOOLS.iter().any(|t| *t == call.tool_name) { - CallResult::Error { - message: format!( - "tool '{}' not in bridge allowlist (permitted: {:?})", - call.tool_name, ALLOWED_TOOLS - ), + match frame { + Frame::Call(call) => { + // Mirror production allowlist + mcp_* early-branch logic. + let result = if openfang_runtime::mcp::is_mcp_tool(&call.tool_name) { + // Twin can't reach real mcp_connections; canned + // upstream-style ok lets list+invoke round-trip in tests. + CallResult::Ok { + content: format!( + "[test-twin canned upstream ok for {}]", + call.tool_name + ), + is_error: false, + } + } else if !ALLOWED_TOOLS.iter().any(|t| *t == call.tool_name) { + CallResult::Error { + message: format!( + "tool '{}' not in bridge allowlist (permitted: {:?})", + call.tool_name, ALLOWED_TOOLS + ), + } + } else { + // Canned Ok stand-in for `execute_tool` — kernel-free tests + // can't exercise the real dispatch path. + CallResult::Ok { + content: format!("[test-twin canned ok for {}]", call.tool_name), + is_error: false, + } + }; + + codec::write_frame( + &mut write_half, + &Frame::Response(CallResponse { + request_id: call.request_id, + result, + }), + ) + .await?; } - } else { - // Canned Ok stand-in for `execute_tool` — kernel-free tests - // can't exercise the real dispatch path. - CallResult::Ok { - content: format!("[test-twin canned ok for {}]", call.tool_name), - is_error: false, + Frame::ListUpstream(req) => { + // Canned upstream-tools list. Real handler walks + // `kernel.mcp_connections`; twin returns a fixed + // shape so wire round-trip can be exercised. + let response = UpstreamListResponse { + request_id: req.request_id, + result: UpstreamListResult::Ok { + tools: vec![UpstreamToolDef { + name: "mcp_twinsrv_ping".to_string(), + server: "twinsrv".to_string(), + description: Some("canned twin tool".to_string()), + input_schema: serde_json::json!({"type": "object"}), + }], + }, + }; + codec::write_frame(&mut write_half, &Frame::UpstreamList(response)).await?; } - }; + _ => continue, + } + } + } - codec::write_frame( - &mut write_half, - &Frame::Response(CallResponse { - request_id: call.request_id, - result, - }), - ) - .await?; + #[tokio::test] + async fn ipc_list_upstream_roundtrip() { + // End-to-end: handshake, send ListUpstream, expect UpstreamList + // response with the twin's canned tool. Locks the wire shape of + // the new variants and ensures the request loop dispatches them + // alongside CallRequest without breaking the existing path. + let tmp = tempfile::tempdir().unwrap(); + let sock = tmp.path().join("bridge.sock"); + let listener = UnixListener::bind(&sock).unwrap(); + + let authority = BridgeAuthority::new(); + let agent_id = AgentId::new(); + let guard = authority.issue(agent_id); + let presented_token = guard.token().to_hex(); + + let server_authority = authority.clone(); + let server = tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + handle_connection_test_twin(stream, server_authority) + .await + .unwrap(); + }); + + let mut client = ClientStream::connect(&sock).await.unwrap(); + let (cr, mut cw) = client.split(); + let mut cr = BufReader::new(cr); + + let hello = Frame::Hello(Hello { + protocol_version: PROTOCOL_VERSION, + token: presented_token, + bridge_version: "test".into(), + }); + codec::write_frame(&mut cw, &hello).await.unwrap(); + match codec::read_frame(&mut cr).await.unwrap() { + Frame::HelloAck(HelloAck::Ok { .. }) => {} + other => panic!("expected HelloAck::Ok, got {other:?}"), } + + codec::write_frame( + &mut cw, + &Frame::ListUpstream(ListUpstreamRequest { request_id: 42 }), + ) + .await + .unwrap(); + match codec::read_frame(&mut cr).await.unwrap() { + Frame::UpstreamList(UpstreamListResponse { + request_id: 42, + result: UpstreamListResult::Ok { tools }, + }) => { + assert_eq!(tools.len(), 1, "twin advertises one canned tool"); + assert_eq!(tools[0].name, "mcp_twinsrv_ping"); + assert_eq!(tools[0].server, "twinsrv"); + } + other => panic!("unexpected response to ListUpstream: {other:?}"), + } + + // Confirm the request loop still handles a Call after a + // ListUpstream (no state corruption between message kinds). + codec::write_frame( + &mut cw, + &Frame::Call(CallRequest { + request_id: 43, + agent_id: agent_id.to_string(), + tool_name: "file_read".into(), + args: serde_json::json!({"path": "x"}), + }), + ) + .await + .unwrap(); + match codec::read_frame(&mut cr).await.unwrap() { + Frame::Response(CallResponse { + request_id: 43, + result: CallResult::Ok { + is_error: false, .. + }, + }) => {} + other => panic!("unexpected response after ListUpstream: {other:?}"), + } + + drop(client); + server.await.unwrap(); + drop(guard); + assert_eq!(authority.live_spawn_count(), 0); + } + + #[tokio::test] + async fn ipc_mcp_call_through_twin_returns_canned_ok() { + // The twin's mcp_* branch returns a canned ok rather than the + // allowlist error, exercising the production early-branch shape: + // mcp_* tools bypass the static ALLOWED_TOOLS gate. + let tmp = tempfile::tempdir().unwrap(); + let sock = tmp.path().join("bridge.sock"); + let listener = UnixListener::bind(&sock).unwrap(); + + let authority = BridgeAuthority::new(); + let agent_id = AgentId::new(); + let guard = authority.issue(agent_id); + let presented_token = guard.token().to_hex(); + + let server_authority = authority.clone(); + let server = tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + handle_connection_test_twin(stream, server_authority) + .await + .unwrap(); + }); + + let mut client = ClientStream::connect(&sock).await.unwrap(); + let (cr, mut cw) = client.split(); + let mut cr = BufReader::new(cr); + + let hello = Frame::Hello(Hello { + protocol_version: PROTOCOL_VERSION, + token: presented_token, + bridge_version: "test".into(), + }); + codec::write_frame(&mut cw, &hello).await.unwrap(); + match codec::read_frame(&mut cr).await.unwrap() { + Frame::HelloAck(HelloAck::Ok { .. }) => {} + other => panic!("expected HelloAck::Ok, got {other:?}"), + } + + codec::write_frame( + &mut cw, + &Frame::Call(CallRequest { + request_id: 7, + agent_id: agent_id.to_string(), + tool_name: "mcp_linear_getteams".into(), + args: serde_json::json!({}), + }), + ) + .await + .unwrap(); + match codec::read_frame(&mut cr).await.unwrap() { + Frame::Response(CallResponse { + request_id: 7, + result: + CallResult::Ok { + content, + is_error: false, + }, + }) => { + assert!(content.contains("mcp_linear_getteams")); + } + other => panic!("unexpected response to mcp_* call: {other:?}"), + } + + drop(client); + server.await.unwrap(); + drop(guard); } #[test] @@ -1147,4 +1672,45 @@ mod tests { "legacy path has no fingerprint" ); } + + /// Drift detection: `RESERVED_BUILTIN_NAMES` (the collision-check list + /// used at MCP discovery in `openfang-runtime`) MUST equal + /// `ALLOWED_TOOLS` (this crate). If a new built-in tool is added to + /// `ALLOWED_TOOLS` and `built_in_tools()` but the reservation list + /// drifts, an upstream MCP server could shadow the new built-in. + /// + /// Owner contract: any addition to `ALLOWED_TOOLS` must also be added + /// to `openfang_runtime::mcp::RESERVED_BUILTIN_NAMES`. + #[test] + fn reserved_builtin_names_matches_allowed_tools() { + use openfang_runtime::mcp::RESERVED_BUILTIN_NAMES; + use std::collections::BTreeSet; + let allowed: BTreeSet<&str> = ALLOWED_TOOLS.iter().copied().collect(); + let reserved: BTreeSet<&str> = RESERVED_BUILTIN_NAMES.iter().copied().collect(); + assert_eq!( + allowed, reserved, + "drift: openfang_runtime::mcp::RESERVED_BUILTIN_NAMES ≠ ALLOWED_TOOLS. \ + Built-ins added to ALLOWED_TOOLS must also be added to \ + RESERVED_BUILTIN_NAMES so upstream MCP servers cannot shadow them." + ); + } + /// Drift catcher: no built-in tool may use the `mcp_` prefix. + /// + /// The dispatch gate `is_mcp_tool(name) = name.starts_with("mcp_")` + /// in `openfang_runtime::mcp` short-circuits BEFORE the static + /// `ALLOWED_TOOLS` check at the top of `dispatch_tool_call`. If a + /// future built-in were named `mcp_*`, calls to it would route to + /// `dispatch_upstream_mcp_call` instead of the built-in's real + /// handler. Structurally impossible today; this test locks the + /// invariant so a future addition doesn't silently subvert dispatch. + #[test] + fn no_builtin_uses_mcp_prefix() { + for name in ALLOWED_TOOLS { + assert!( + !name.starts_with("mcp_"), + "built-in '{name}' uses 'mcp_' prefix; conflicts with \ + is_mcp_tool dispatch gate in openfang_runtime::mcp" + ); + } + } } diff --git a/crates/openfang-mcp-bridge/src/lib.rs b/crates/openfang-mcp-bridge/src/lib.rs index 4faf33641..8428ad669 100644 --- a/crates/openfang-mcp-bridge/src/lib.rs +++ b/crates/openfang-mcp-bridge/src/lib.rs @@ -41,6 +41,8 @@ use std::sync::Arc; use rmcp::{model::*, service::RequestContext, ErrorData as McpError, ServerHandler}; +use crate::protocol::UpstreamToolDef; + /// Narrow seam between the bridge and the OpenFang runtime. /// /// The runtime (or, in the real topology, an IPC-backed adapter that talks @@ -66,6 +68,19 @@ pub trait ToolDispatcher: Send + Sync { /// it from `agent.toml`. fn allowed_tools(&self) -> Vec; + /// Forwarded upstream MCP tools the daemon told us this agent may + /// invoke (from `agent.toml mcp_servers`, resolved server-side at + /// list-upstream time). Default: none. + /// + /// Names are already namespaced (`mcp_{server}_{tool}`) and are + /// advertised by the bridge in addition to the built-in surface + /// without going through [`Self::allowed_tools`] — that field gates + /// the OpenFang built-in slice; upstream gating is handled + /// server-side by the daemon against the agent's MCP allowlist. + fn upstream_tools(&self) -> Vec { + Vec::new() + } + /// Invoke a tool by name with a JSON argument blob. The dispatcher is /// responsible for capability enforcement; the bridge MUST NOT assume the /// caller is trusted. @@ -475,14 +490,47 @@ impl Bridge { } /// Tools the bridge will both advertise and accept calls for, given the - /// dispatcher's allowed set. + /// dispatcher's allowed set, plus any daemon-forwarded upstream MCP tools. fn permitted_tools(&self) -> Vec { let allowed = self.dispatcher.allowed_tools(); - built_in_tools() + let mut tools: Vec = built_in_tools() .into_iter() .filter(|t| allowed.iter().any(|a| a.as_str() == t.name.as_ref())) - .collect() + .collect(); + // Append upstream MCP tools. NOT gated by `allowed_tools()` — + // server-side `agent.toml mcp_servers` gating already happened + // when the daemon answered `ListUpstream`. Name collisions with + // built-ins are refused at MCP discovery in the daemon, so we + // trust the names here. + for def in self.dispatcher.upstream_tools() { + tools.push(upstream_def_to_tool(def)); + } + tools } + + /// True if `name` is among the advertised upstream MCP tools. + fn is_advertised_upstream(&self, name: &str) -> bool { + self.dispatcher + .upstream_tools() + .into_iter() + .any(|t| t.name == name) + } +} + +/// Convert a daemon-forwarded upstream MCP tool definition into the +/// rmcp `Tool` shape advertised on the MCP wire. Description defaults +/// to an empty string when absent; input schema is taken verbatim, or +/// substituted with an empty object if the upstream sent a non-object. +fn upstream_def_to_tool(def: UpstreamToolDef) -> Tool { + let schema_map = match def.input_schema { + serde_json::Value::Object(m) => m, + _ => serde_json::Map::new(), + }; + Tool::new( + def.name, + def.description.unwrap_or_default(), + std::sync::Arc::new(schema_map), + ) } impl ServerHandler for Bridge { @@ -518,10 +566,21 @@ impl ServerHandler for Bridge { .map(serde_json::Value::Object) .unwrap_or(serde_json::Value::Null); - // Defense-in-depth: re-check the allowlist before crossing the seam. - // The dispatcher will enforce again; that's intentional. + // Defense-in-depth: re-check before crossing the seam. The + // dispatcher will enforce again; that's intentional. + // + // Two paths: + // - Built-in tools: must appear in `allowed_tools()` (the + // `OPENFANG_BRIDGE_ALLOWED` / `DEFAULT_ALLOWED` gate). + // - Upstream `mcp_*` tools: must have been advertised by + // the daemon at list-upstream time. The daemon also + // enforces `agent.toml mcp_servers` server-side on + // dispatch. let allowed = self.dispatcher.allowed_tools(); - if !allowed.iter().any(|a| a == tool_name) { + let is_builtin_allowed = allowed.iter().any(|a| a == tool_name); + let is_advertised_upstream = + tool_name.starts_with("mcp_") && self.is_advertised_upstream(tool_name); + if !is_builtin_allowed && !is_advertised_upstream { return Ok(CallToolResult::error(vec![Content::text(format!( "tool '{tool_name}' not permitted for this agent" ))])); @@ -563,9 +622,21 @@ mod tests { struct StubDispatcher { agent: String, allowed: Vec, + upstream: Vec, canned: DispatchOk, } + impl StubDispatcher { + fn new(agent: &str, allowed: Vec, canned: DispatchOk) -> Self { + Self { + agent: agent.into(), + allowed, + upstream: Vec::new(), + canned, + } + } + } + #[async_trait::async_trait] impl ToolDispatcher for StubDispatcher { fn agent_id(&self) -> &str { @@ -574,6 +645,9 @@ mod tests { fn allowed_tools(&self) -> Vec { self.allowed.clone() } + fn upstream_tools(&self) -> Vec { + self.upstream.clone() + } async fn call( &self, _tool_name: &str, @@ -615,16 +689,16 @@ mod tests { #[test] fn permitted_tools_intersects_with_dispatcher_allowed() { - let bridge = Bridge::new(Arc::new(StubDispatcher { - agent: "a".into(), + let bridge = Bridge::new(Arc::new(StubDispatcher::new( + "a", // Dispatcher permits only file_read of the built-in slice; // not_a_real_tool is unknown to the bridge and must be ignored. - allowed: vec!["file_read".into(), "not_a_real_tool".into()], - canned: DispatchOk { + vec!["file_read".into(), "not_a_real_tool".into()], + DispatchOk { content: String::new(), is_error: false, }, - })); + ))); let names: Vec = bridge .permitted_tools() .into_iter() @@ -632,4 +706,67 @@ mod tests { .collect(); assert_eq!(names, vec!["file_read".to_string()]); } + + #[test] + fn permitted_tools_appends_upstream_after_builtins() { + let mut stub = StubDispatcher::new( + "a", + vec!["file_read".into()], + DispatchOk { + content: String::new(), + is_error: false, + }, + ); + stub.upstream = vec![ + UpstreamToolDef { + name: "mcp_linear_getteams".into(), + server: "linear".into(), + description: Some("List teams".into()), + input_schema: serde_json::json!({"type":"object"}), + }, + UpstreamToolDef { + name: "mcp_notion_search".into(), + server: "notion".into(), + description: None, + input_schema: serde_json::json!({}), + }, + ]; + let bridge = Bridge::new(Arc::new(stub)); + let names: Vec = bridge + .permitted_tools() + .into_iter() + .map(|t| t.name.into_owned()) + .collect(); + // Built-in first, then upstream, preserving order. + assert_eq!( + names, + vec![ + "file_read".to_string(), + "mcp_linear_getteams".to_string(), + "mcp_notion_search".to_string(), + ], + ); + } + + #[test] + fn is_advertised_upstream_matches_only_advertised_names() { + let mut stub = StubDispatcher::new( + "a", + vec![], + DispatchOk { + content: String::new(), + is_error: false, + }, + ); + stub.upstream = vec![UpstreamToolDef { + name: "mcp_linear_getteams".into(), + server: "linear".into(), + description: None, + input_schema: serde_json::json!({}), + }]; + let bridge = Bridge::new(Arc::new(stub)); + assert!(bridge.is_advertised_upstream("mcp_linear_getteams")); + assert!(!bridge.is_advertised_upstream("mcp_linear_unknown")); + assert!(!bridge.is_advertised_upstream("file_read")); + } } diff --git a/crates/openfang-mcp-bridge/src/main.rs b/crates/openfang-mcp-bridge/src/main.rs index 8fdf21674..e315c96f6 100644 --- a/crates/openfang-mcp-bridge/src/main.rs +++ b/crates/openfang-mcp-bridge/src/main.rs @@ -57,8 +57,8 @@ use anyhow::{anyhow, bail, Context, Result}; #[cfg(unix)] use openfang_mcp_bridge::{ protocol::{ - codec, CallRequest, CallResult, Frame, Hello, HelloAck, PROTOCOL_VERSION, SOCKET_ENV_VAR, - TOKEN_ENV_VAR, + codec, CallRequest, CallResult, Frame, Hello, HelloAck, ListUpstreamRequest, + UpstreamListResult, UpstreamToolDef, PROTOCOL_VERSION, SOCKET_ENV_VAR, TOKEN_ENV_VAR, }, Bridge, DispatchOk, ToolDispatchError, ToolDispatcher, DEFAULT_ALLOWED, }; @@ -126,8 +126,30 @@ async fn main() -> Result<()> { handshake(&mut stream, &token).await?; + // --- List upstream MCP tools (best-effort) --- + // + // Round-trip happens here, with sole ownership of the stream, so + // it is sequenced before the actor takes the read/write halves. + // A protocol-level error from the daemon (agent identity not + // resolvable, registry unavailable) is logged but does NOT abort + // the bridge: built-in tools still work; the agent simply sees + // no `mcp_*` surface this session. + let upstream_tools = list_upstream(&mut stream).await.unwrap_or_else(|e| { + tracing::warn!(error = %e, "list_upstream failed; bridge continues without upstream MCP surface"); + Vec::new() + }); + tracing::info!( + count = upstream_tools.len(), + "upstream MCP tools advertised" + ); + // --- Spawn IPC actor --- - let dispatcher = spawn_ipc_actor(stream, agent_id.clone(), allowed_tools.clone()); + let dispatcher = spawn_ipc_actor( + stream, + agent_id.clone(), + allowed_tools.clone(), + upstream_tools, + ); // --- Run MCP server over stdio --- let service = Bridge::new(Arc::new(dispatcher)) @@ -169,6 +191,54 @@ async fn handshake(stream: &mut UnixStream, token: &str) -> Result<()> { } } +/// Maximum time to wait for the per-agent upstream MCP tool list from +/// the daemon. Bounds bridge startup so a wedged daemon or a slow +/// upstream MCP server (`mcp_connections` lock held during a slow +/// `tools/list`) cannot stall every new bridge session indefinitely. +/// On timeout the caller downgrades to "no upstream surface this +/// session" — same failure mode as a protocol-level refusal. +#[cfg(unix)] +const LIST_UPSTREAM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(15); + +/// Bridge → daemon: ask for the per-agent upstream MCP tool list. +/// +/// Sent once, immediately after a successful handshake while the +/// caller still owns the stream end-to-end. Returns the advertised +/// upstream tools (possibly empty) on success, or an error that the +/// caller may downgrade to "no upstream surface this session". +/// +/// Bounded by [`LIST_UPSTREAM_TIMEOUT`]. +#[cfg(unix)] +async fn list_upstream(stream: &mut UnixStream) -> Result> { + tokio::time::timeout(LIST_UPSTREAM_TIMEOUT, list_upstream_inner(stream)) + .await + .map_err(|_| anyhow!("list_upstream timed out after {:?}", LIST_UPSTREAM_TIMEOUT))? +} + +#[cfg(unix)] +async fn list_upstream_inner(stream: &mut UnixStream) -> Result> { + let (read_half, mut write_half) = stream.split(); + let mut read_half = BufReader::new(read_half); + + let req = Frame::ListUpstream(ListUpstreamRequest { request_id: 0 }); + codec::write_frame(&mut write_half, &req) + .await + .context("write ListUpstream")?; + + match codec::read_frame(&mut read_half) + .await + .context("read UpstreamList")? + { + Frame::UpstreamList(resp) => match resp.result { + UpstreamListResult::Ok { tools } => Ok(tools), + UpstreamListResult::Error { message } => { + Err(anyhow!("daemon refused list_upstream: {message}")) + } + }, + other => Err(anyhow!("expected UpstreamList, got {other:?}")), + } +} + /// One pending request: the slot the actor will fill when its response frame /// arrives over the wire. #[cfg(unix)] @@ -188,6 +258,11 @@ struct IpcRequest { pub struct IpcDispatcher { agent_id: String, allowed: Vec, + /// Cached snapshot of upstream MCP tools advertised by the daemon + /// at session start (one-shot `ListUpstream` round-trip). Refreshes + /// require restarting the bridge — by design for now, since the + /// daemon also restarts agents on `agent.toml mcp_servers` changes. + upstream: Vec, tx: mpsc::Sender, next_id: AtomicU64, } @@ -203,12 +278,27 @@ impl ToolDispatcher for IpcDispatcher { self.allowed.clone() } + fn upstream_tools(&self) -> Vec { + self.upstream.clone() + } + async fn call( &self, tool_name: &str, args: serde_json::Value, ) -> Result { - if !self.allowed.iter().any(|a| a == tool_name) { + // Two gates here: + // - Built-in tools must be in `OPENFANG_BRIDGE_ALLOWED` / + // `DEFAULT_ALLOWED`. + // - `mcp_*` upstream tools must have been advertised by the + // daemon at list-upstream time. Server-side `mcp_servers` + // gating in the daemon is the source of truth; this is + // just an early-exit hygiene check so we never ship a + // bogus `mcp_*` name across the wire. + let is_builtin_allowed = self.allowed.iter().any(|a| a == tool_name); + let is_advertised_upstream = + tool_name.starts_with("mcp_") && self.upstream.iter().any(|t| t.name == tool_name); + if !is_builtin_allowed && !is_advertised_upstream { return Err(ToolDispatchError::NotPermitted(tool_name.to_string())); } @@ -256,6 +346,7 @@ pub fn spawn_ipc_actor( stream: UnixStream, agent_id: String, allowed: Vec, + upstream: Vec, ) -> IpcDispatcher { let (tx, mut rx) = mpsc::channel::(32); let pending: PendingMap = Arc::new(Mutex::new(HashMap::new())); @@ -343,6 +434,7 @@ pub fn spawn_ipc_actor( IpcDispatcher { agent_id, allowed, + upstream, tx, next_id: AtomicU64::new(1), } @@ -445,6 +537,7 @@ mod tests { client, "agent-x".into(), vec!["file_read".into(), "file_list".into()], + Vec::new(), ); // Concurrent calls — exercise the correlation map. @@ -470,4 +563,178 @@ mod tests { let _ = server.await; } + + /// End-to-end list_upstream + mcp_* dispatch: + /// - fake daemon answers Hello/HelloAck, + /// - then responds to ListUpstream with a one-tool catalog, + /// - bridge dispatcher gets that cache, + /// - a `mcp_linear_getteams` call passes the bridge-side gate + /// (NOT in `allowed`) and round-trips back with the tool name. + /// - a `mcp_unknown_tool` call (not in upstream cache) is + /// rejected client-side with NotPermitted. + #[tokio::test] + async fn list_upstream_handshake_and_mcp_dispatch() { + use openfang_mcp_bridge::protocol::{ + CallResponse, UpstreamListResponse, UpstreamListResult, + }; + + let tmp = tempfile::tempdir().unwrap(); + let sock = tmp.path().join("bridge.sock"); + let listener = UnixListener::bind(&sock).unwrap(); + + let server = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let (rh, mut wh) = stream.split(); + let mut rh = BufReader::new(rh); + // Hello. + match codec::read_frame(&mut rh).await.unwrap() { + Frame::Hello(_) => {} + _ => panic!("expected Hello"), + } + codec::write_frame( + &mut wh, + &Frame::HelloAck(HelloAck::Ok { + daemon_version: "test".into(), + }), + ) + .await + .unwrap(); + // ListUpstream. + let req = match codec::read_frame(&mut rh).await.unwrap() { + Frame::ListUpstream(r) => r, + other => panic!("expected ListUpstream, got {other:?}"), + }; + codec::write_frame( + &mut wh, + &Frame::UpstreamList(UpstreamListResponse { + request_id: req.request_id, + result: UpstreamListResult::Ok { + tools: vec![UpstreamToolDef { + name: "mcp_linear_getteams".into(), + server: "linear".into(), + description: Some("List Linear teams".into()), + input_schema: serde_json::json!({"type":"object"}), + }], + }, + }), + ) + .await + .unwrap(); + // One mcp_* Call → echo tool_name back as Ok content. + let call = match codec::read_frame(&mut rh).await.unwrap() { + Frame::Call(c) => c, + other => panic!("expected Call, got {other:?}"), + }; + codec::write_frame( + &mut wh, + &Frame::Response(CallResponse { + request_id: call.request_id, + result: CallResult::Ok { + content: call.tool_name, + is_error: false, + }, + }), + ) + .await + .unwrap(); + }); + + let mut client = UnixStream::connect(&sock).await.unwrap(); + + // Inline handshake. + { + let (rh, mut wh) = client.split(); + let mut rh = BufReader::new(rh); + codec::write_frame( + &mut wh, + &Frame::Hello(Hello { + protocol_version: PROTOCOL_VERSION, + token: "t".into(), + bridge_version: "test".into(), + }), + ) + .await + .unwrap(); + match codec::read_frame(&mut rh).await.unwrap() { + Frame::HelloAck(HelloAck::Ok { .. }) => {} + other => panic!("bad ack: {other:?}"), + } + } + + // list_upstream round-trip via the production helper. + let upstream = list_upstream(&mut client).await.expect("list_upstream ok"); + assert_eq!(upstream.len(), 1); + assert_eq!(upstream[0].name, "mcp_linear_getteams"); + + let dispatcher = spawn_ipc_actor( + client, + "agent-x".into(), + // mcp_* is NOT in the built-in allowlist — only the + // upstream cache should grant it. + vec!["file_read".into()], + upstream, + ); + + // Allowed via upstream cache. + let ok = dispatcher + .call("mcp_linear_getteams", serde_json::json!({})) + .await + .expect("mcp dispatch ok"); + assert_eq!(ok.content, "mcp_linear_getteams"); + assert!(!ok.is_error); + + // Not in upstream cache, not in allowed — must be denied locally. + let denied = dispatcher + .call("mcp_linear_notallowed", serde_json::json!({})) + .await + .expect_err("unadvertised mcp_* tool must be denied"); + match denied { + ToolDispatchError::NotPermitted(t) => assert_eq!(t, "mcp_linear_notallowed"), + other => panic!("expected NotPermitted, got {other:?}"), + } + + let _ = server.await; + } + + /// list_upstream() surfaces protocol-layer errors as Err — the + /// production caller in main() downgrades these to "no upstream + /// surface this session" via unwrap_or_else. + #[tokio::test] + async fn list_upstream_propagates_error_result() { + use openfang_mcp_bridge::protocol::{UpstreamListResponse, UpstreamListResult}; + + let tmp = tempfile::tempdir().unwrap(); + let sock = tmp.path().join("bridge.sock"); + let listener = UnixListener::bind(&sock).unwrap(); + + let server = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let (rh, mut wh) = stream.split(); + let mut rh = BufReader::new(rh); + // Skip Hello (the test doesn't perform it; client below + // sends ListUpstream directly). + let req = match codec::read_frame(&mut rh).await.unwrap() { + Frame::ListUpstream(r) => r, + other => panic!("expected ListUpstream, got {other:?}"), + }; + codec::write_frame( + &mut wh, + &Frame::UpstreamList(UpstreamListResponse { + request_id: req.request_id, + result: UpstreamListResult::Error { + message: "agent identity not resolvable".into(), + }, + }), + ) + .await + .unwrap(); + }); + + let mut client = UnixStream::connect(&sock).await.unwrap(); + let err = list_upstream(&mut client) + .await + .expect_err("Error result must propagate"); + assert!(err.to_string().contains("not resolvable")); + let _ = server.await; + } } diff --git a/crates/openfang-mcp-bridge/src/protocol.rs b/crates/openfang-mcp-bridge/src/protocol.rs index cadb8d321..0679f4193 100644 --- a/crates/openfang-mcp-bridge/src/protocol.rs +++ b/crates/openfang-mcp-bridge/src/protocol.rs @@ -109,6 +109,65 @@ pub enum CallResult { Error { message: String }, } +/// Bridge → daemon: request the list of upstream MCP tools the calling +/// agent is permitted to invoke. +/// +/// Sent once per session, immediately after a successful [`Hello`]/[`HelloAck::Ok`] +/// handshake. The daemon answers with [`UpstreamListResponse`]. +/// +/// Identity is taken from the authenticated [`Hello::token`] (resolved +/// server-side to an `agent_id`); the bridge does not name the agent in +/// this frame. Per-agent gating is enforced server-side against the +/// agent's `agent.toml mcp_servers` allowlist. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListUpstreamRequest { + /// Caller-assigned correlation id. Daemon echoes in + /// [`UpstreamListResponse::request_id`]. + pub request_id: u64, +} + +/// One forwarded upstream MCP tool, as advertised to the bridge. +/// +/// The `name` is already namespaced (`mcp_{server}_{tool}`) by the daemon +/// at MCP-discovery time; the bridge surfaces this name verbatim in its +/// own `tools/list` and routes invocations of the same name back across +/// the IPC. +/// +/// `input_schema` is the upstream server's JSON Schema for the tool, passed +/// through opaquely — the daemon does not validate it. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpstreamToolDef { + /// Already-namespaced tool name (e.g. `mcp_linear_getteams`). + pub name: String, + /// Logical server name from `config.toml` (e.g. `linear`). Used by + /// the bridge for grouping/debug; not parsed from `name` on receive. + pub server: String, + /// Human-readable description, forwarded from the upstream MCP server. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub description: Option, + /// Opaque JSON Schema for the tool's input. Forwarded verbatim. + pub input_schema: serde_json::Value, +} + +/// Daemon → bridge: response to [`ListUpstreamRequest`]. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpstreamListResponse { + pub request_id: u64, + pub result: UpstreamListResult, +} + +/// Outcome of an upstream tool-list dispatch. +/// +/// `Error` is reserved for protocol-layer failures (agent identity not +/// resolvable, registry unavailable). An agent that simply has no upstream +/// servers configured receives `Ok { tools: vec![] }`. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum UpstreamListResult { + Ok { tools: Vec }, + Error { message: String }, +} + /// Top-level frame type, for connections that may carry multiple message kinds. /// /// At present we only multiplex Hello/HelloAck on connection start and @@ -122,6 +181,15 @@ pub enum Frame { HelloAck(HelloAck), Call(CallRequest), Response(CallResponse), + /// Bridge → daemon: request the per-agent upstream MCP tool list. + /// + /// Added in protocol v1 as a non-breaking variant; older builds that + /// predate this variant reject the frame as an unknown tag and close + /// the connection. Daemon and bridge ship from the same workspace, + /// so version skew here would indicate a build-system error. + ListUpstream(ListUpstreamRequest), + /// Daemon → bridge: response to [`Frame::ListUpstream`]. + UpstreamList(UpstreamListResponse), } #[cfg(feature = "ipc-codec")] @@ -221,4 +289,119 @@ mod tests { assert!(s.contains("rejected")); assert!(s.contains("bad token")); } + + #[test] + fn frame_roundtrip_list_upstream_request() { + let frame = Frame::ListUpstream(ListUpstreamRequest { request_id: 99 }); + let json = serde_json::to_string(&frame).unwrap(); + assert!(json.contains("list_upstream")); + let back: Frame = serde_json::from_str(&json).unwrap(); + match back { + Frame::ListUpstream(r) => assert_eq!(r.request_id, 99), + _ => panic!("wrong variant"), + } + } + + #[test] + fn frame_roundtrip_upstream_list_ok() { + let frame = Frame::UpstreamList(UpstreamListResponse { + request_id: 99, + result: UpstreamListResult::Ok { + tools: vec![UpstreamToolDef { + name: "mcp_linear_getteams".into(), + server: "linear".into(), + description: Some("List Linear teams".into()), + input_schema: serde_json::json!({ "type": "object" }), + }], + }, + }); + let json = serde_json::to_string(&frame).unwrap(); + assert!(json.contains("upstream_list")); + assert!(json.contains("mcp_linear_getteams")); + let back: Frame = serde_json::from_str(&json).unwrap(); + match back { + Frame::UpstreamList(r) => { + assert_eq!(r.request_id, 99); + match r.result { + UpstreamListResult::Ok { tools } => { + assert_eq!(tools.len(), 1); + assert_eq!(tools[0].server, "linear"); + } + _ => panic!("wrong result variant"), + } + } + _ => panic!("wrong frame variant"), + } + } + + #[test] + fn frame_roundtrip_upstream_list_empty() { + // Agent with no upstream MCP servers configured receives an empty + // Ok list, not an Error. Guards against the bridge treating + // "no servers" as a fatal handshake failure. + let frame = Frame::UpstreamList(UpstreamListResponse { + request_id: 1, + result: UpstreamListResult::Ok { tools: vec![] }, + }); + let json = serde_json::to_string(&frame).unwrap(); + let back: Frame = serde_json::from_str(&json).unwrap(); + if let Frame::UpstreamList(r) = back { + match r.result { + UpstreamListResult::Ok { tools } => assert!(tools.is_empty()), + _ => panic!("wrong result variant"), + } + } else { + panic!("wrong frame variant"); + } + } + + #[test] + fn frame_roundtrip_upstream_list_error() { + let frame = Frame::UpstreamList(UpstreamListResponse { + request_id: 7, + result: UpstreamListResult::Error { + message: "agent identity not resolvable".into(), + }, + }); + let json = serde_json::to_string(&frame).unwrap(); + assert!(json.contains("error")); + let back: Frame = serde_json::from_str(&json).unwrap(); + if let Frame::UpstreamList(r) = back { + match r.result { + UpstreamListResult::Error { message } => { + assert!(message.contains("not resolvable")); + } + _ => panic!("wrong result variant"), + } + } else { + panic!("wrong frame variant"); + } + } + + #[test] + fn upstream_tool_def_omits_none_description() { + let def = UpstreamToolDef { + name: "mcp_notion_search".into(), + server: "notion".into(), + description: None, + input_schema: serde_json::json!({}), + }; + let json = serde_json::to_string(&def).unwrap(); + // skip_serializing_if drops the field entirely when None. + assert!(!json.contains("description")); + // And it round-trips back as None. + let back: UpstreamToolDef = serde_json::from_str(&json).unwrap(); + assert!(back.description.is_none()); + } + + #[test] + fn frame_unknown_variant_is_rejected_cleanly() { + // Forward-compat guard: a frame with an unknown `type` tag must + // produce a serde error, not a panic. The IPC reader maps this + // to an io::Error and closes the connection — but the test here + // is just that decode fails without unwinding. + let json = r#"{"type":"future_kind","payload":{}}"#; + let result: Result = serde_json::from_str(json); + assert!(result.is_err(), "expected decode failure on unknown tag"); + } } diff --git a/crates/openfang-runtime/src/mcp.rs b/crates/openfang-runtime/src/mcp.rs index 13f590284..3afe17e8f 100644 --- a/crates/openfang-runtime/src/mcp.rs +++ b/crates/openfang-runtime/src/mcp.rs @@ -14,7 +14,7 @@ use rmcp::service::RunningService; use rmcp::{RoleClient, ServiceExt}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; // --------------------------------------------------------------------------- // Configuration types @@ -125,6 +125,11 @@ impl McpConnection { } /// Discover available tools via `tools/list`. + /// + /// Performs collision checks against [`RESERVED_BUILTIN_NAMES`] and + /// against names already discovered from this same server. Conflicting + /// upstream tools are dropped with a `WARN` log — built-ins win, no + /// silent shadowing. async fn discover_tools(&mut self) -> Result<(), String> { let tools = self .client @@ -132,26 +137,21 @@ impl McpConnection { .await .map_err(|e| format!("Failed to list MCP tools: {e}"))?; - let server_name = &self.config.name; + let server_name = self.config.name.clone(); for tool in &tools { - let raw_name = &tool.name; - let description = tool.description.as_deref().unwrap_or(""); - + let raw_name = tool.name.to_string(); + let description = tool.description.as_deref().unwrap_or("").to_string(); let input_schema = serde_json::to_value(&tool.input_schema) .unwrap_or(serde_json::json!({"type": "object"})); - // Namespace: mcp_{server}_{tool} - let namespaced = format_mcp_tool_name(server_name, raw_name); - - // Store original name so we can send it back to the server - self.original_names - .insert(namespaced.clone(), raw_name.to_string()); - - self.tools.push(ToolDefinition { - name: namespaced, - description: format!("[MCP:{server_name}] {description}"), + register_discovered_tool( + &server_name, + &raw_name, + &description, input_schema, - }); + &mut self.tools, + &mut self.original_names, + ); } Ok(()) @@ -339,6 +339,90 @@ impl McpConnection { // Tool namespacing helpers // --------------------------------------------------------------------------- +/// OpenFang built-in tool names that MCP upstream tools must not shadow. +/// +/// This list MUST stay in sync with +/// [`openfang_api::bridge_ipc::ALLOWED_TOOLS`] and +/// [`openfang_mcp_bridge::built_in_tools`]. A drift test in +/// `openfang-api::bridge_ipc::tests` asserts equality with `ALLOWED_TOOLS` +/// and will fail CI if these lists diverge. +/// +/// The MCP namespacing scheme (`mcp_{server}_{tool}`) makes structural +/// collisions impossible today (no built-in starts with `mcp_`), but +/// enforcing this check is defense-in-depth: future built-ins could be +/// added, and an upstream server returning a crafted name like +/// `file_read` (without `mcp_` prefix) is already disqualified by +/// namespacing — yet the namespaced form is what we ultimately gate, so +/// we check that. +pub const RESERVED_BUILTIN_NAMES: &[&str] = &[ + "file_read", + "file_list", + "file_write", + "create_directory", + "web_fetch", + "agent_list", + "channel_send", + "agent_send", + "agent_spawn", + "agent_kill", + "memory_store", + "memory_recall", + "agent_activate", + "agent_find", + "shell_exec", + "web_search", + "apply_patch", +]; + +/// True if `name` shadows an OpenFang built-in tool. +pub fn is_reserved_builtin(name: &str) -> bool { + RESERVED_BUILTIN_NAMES.contains(&name) +} + +/// Register a single discovered upstream tool, applying collision checks. +/// +/// Skips (with `WARN` log) if the namespaced name shadows an OpenFang +/// built-in or duplicates a tool already registered for this server. +/// Extracted from [`McpConnection::discover_tools`] so the gating logic +/// is unit-testable without standing up a live MCP transport. +pub(crate) fn register_discovered_tool( + server_name: &str, + raw_name: &str, + description: &str, + input_schema: serde_json::Value, + tools: &mut Vec, + original_names: &mut HashMap, +) { + let namespaced = format_mcp_tool_name(server_name, raw_name); + + if is_reserved_builtin(&namespaced) { + warn!( + server = %server_name, + tool = %raw_name, + namespaced = %namespaced, + "refusing to register MCP tool: shadows OpenFang built-in" + ); + return; + } + + if tools.iter().any(|t| t.name == namespaced) { + warn!( + server = %server_name, + tool = %raw_name, + namespaced = %namespaced, + "refusing to register MCP tool: duplicate namespaced name from same server" + ); + return; + } + + original_names.insert(namespaced.clone(), raw_name.to_string()); + tools.push(ToolDefinition { + name: namespaced, + description: format!("[MCP:{server_name}] {description}"), + input_schema, + }); +} + /// Format a namespaced MCP tool name: `mcp_{server}_{tool}`. pub fn format_mcp_tool_name(server: &str, tool: &str) -> String { format!("mcp_{}_{}", normalize_name(server), normalize_name(tool)) @@ -538,4 +622,110 @@ mod tests { _ => panic!("Expected Http transport"), } } + + #[test] + fn reserved_builtin_names_includes_core_surface() { + assert!(is_reserved_builtin("file_read")); + assert!(is_reserved_builtin("shell_exec")); + assert!(is_reserved_builtin("apply_patch")); + assert!(!is_reserved_builtin("mcp_linear_getteams")); + assert!(!is_reserved_builtin("")); + } + + #[test] + fn register_discovered_tool_skips_builtin_shadow() { + // An upstream server literally named "file" returning a "read" tool + // would produce the namespaced name `mcp_file_read`, which is NOT a + // built-in. But if the reserved list ever expanded to include the + // namespaced form, the gate must hold. Probe with a synthetic case + // by inserting a built-in name into the test against a server whose + // normalization yields exactly that name. + // + // The realistic threat is a future built-in collision. Simulate it + // by checking a name we know is reserved today. + let mut tools = Vec::new(); + let mut names = HashMap::new(); + + // Direct probe: pretend the server returned a tool whose namespaced + // form lands on a reserved name. We cheat by constructing one whose + // server+tool normalize there. Easiest: assert the gate function + // directly, since the namespaced form is what's gated. + for reserved in RESERVED_BUILTIN_NAMES { + assert!( + is_reserved_builtin(reserved), + "{reserved} should be reserved" + ); + } + + // Realistic register call against a non-colliding name succeeds. + register_discovered_tool( + "linear", + "getteams", + "list teams", + serde_json::json!({"type": "object"}), + &mut tools, + &mut names, + ); + assert_eq!(tools.len(), 1); + assert_eq!(tools[0].name, "mcp_linear_getteams"); + assert_eq!( + names.get("mcp_linear_getteams").map(String::as_str), + Some("getteams") + ); + } + + #[test] + fn register_discovered_tool_skips_duplicate_from_same_server() { + // Two raw names from the same server can normalize to the same + // namespaced form when one contains a hyphen and the other an + // underscore (`get-teams` and `get_teams` both → `mcp_linear_get_teams`). + // The second registration must be dropped, not silently overwrite. + let mut tools = Vec::new(); + let mut names = HashMap::new(); + + register_discovered_tool( + "linear", + "get-teams", + "list teams", + serde_json::json!({"type": "object"}), + &mut tools, + &mut names, + ); + register_discovered_tool( + "linear", + "get_teams", + "list teams (dup)", + serde_json::json!({"type": "object"}), + &mut tools, + &mut names, + ); + + let count = tools + .iter() + .filter(|t| t.name == "mcp_linear_get_teams") + .count(); + assert_eq!(count, 1, "duplicate namespaced name must be dropped"); + // First registration wins — original name preserved as hyphenated form. + assert_eq!( + names.get("mcp_linear_get_teams").map(String::as_str), + Some("get-teams") + ); + } + + #[test] + fn register_discovered_tool_drops_namespaced_collision_with_builtin() { + // Synthetic: pretend "file_read" got added to RESERVED_BUILTIN_NAMES + // as the namespaced form. Today the namespaced form is + // `mcp_{server}_{tool}`, so a true builtin collision can only happen + // if a future built-in lives under `mcp_*`. We assert the gate + // refuses any name that IS in the reserved list by constructing a + // server name "" and tool name "file_read" — which normalize to + // `mcp__file_read` (not reserved). So this test instead validates + // the gate's structural correctness: every reserved name causes + // `is_reserved_builtin` to return true, full stop. + for r in RESERVED_BUILTIN_NAMES { + assert!(is_reserved_builtin(r)); + assert!(!is_reserved_builtin(&format!("mcp_x_{}", r))); + } + } } diff --git a/crates/openfang-runtime/src/workspace_sandbox.rs b/crates/openfang-runtime/src/workspace_sandbox.rs index ee1300f83..2484e3082 100644 --- a/crates/openfang-runtime/src/workspace_sandbox.rs +++ b/crates/openfang-runtime/src/workspace_sandbox.rs @@ -10,6 +10,12 @@ //! misconfigured agent manifest (e.g. `workspace_root = "~/.openfang"`) or a //! future tool surface that bypasses workspace confinement cannot exfiltrate //! these files. +//! +//! A second deny region covers sensitive paths under the user's `$HOME` but +//! outside `$OPENFANG_HOME` — notably `~/.mcp-auth/`, which holds OAuth +//! tokens for `mcp-remote` and other MCP clients. Same threat model: a +//! misconfigured `workspace_root` (e.g. `~`) or a bypass of workspace +//! confinement must not be able to read those tokens. use std::path::{Path, PathBuf}; @@ -70,6 +76,31 @@ pub(crate) fn is_sensitive_openfang_path(path: &Path) -> Option<&'static str> { } } +/// Returns `Some(reason)` if `path` resolves to a sensitive directory under +/// the user's `$HOME` but outside `$OPENFANG_HOME` that must never be read +/// or written by an agent, regardless of `workspace_root`. +/// +/// Companion to [`is_sensitive_openfang_path`] for credentials that live +/// outside `$OPENFANG_HOME`. As of writing the only entry is `~/.mcp-auth/` +/// (OAuth tokens used by `mcp-remote` and similar MCP clients). Add new +/// first-component patterns conservatively: this fires for *any* agent on +/// *any* workspace, including misconfigured ones, so over-denial here costs +/// more than under-denial elsewhere. +/// +/// `path` should be canonicalized before this check. +pub(crate) fn is_sensitive_home_path(path: &Path) -> Option<&'static str> { + let home = std::env::var_os("HOME").map(PathBuf::from)?; + let canon_home = home.canonicalize().ok()?; + let rel = path.strip_prefix(&canon_home).ok()?; + let first = rel.components().next()?.as_os_str().to_str()?; + + match first { + // OAuth tokens for mcp-remote and similar MCP clients. + ".mcp-auth" => Some("oauth-tokens"), + _ => None, + } +} + /// Resolve a user-supplied path within a workspace sandbox. /// /// - Rejects `..` components outright. @@ -137,6 +168,24 @@ pub fn resolve_sandbox_path(user_path: &str, workspace_root: &Path) -> Result