Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions src/agents/mcp/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,61 @@ async def _resolve_meta(
raise TypeError("MCP meta resolver must return a dict or None.")
return result

@staticmethod
def _renest_flattened_arguments(
input_schema: Any,
arguments: dict[str, Any],
) -> dict[str, Any] | None:
"""Re-nest arguments that the model flattened past a nested object property.

The Realtime API does not support strict JSON schema enforcement on function
tools (the `RealtimeFunctionTool` type has no `strict` field), so models will
occasionally ignore a nested-object property in an MCP tool's input schema
and emit its inner fields at the top level. When the schema has exactly one
missing object-typed property that can unambiguously absorb the extra
top-level keys, restore the nesting before forwarding to the MCP server.
Returns the corrected arguments, or None when no safe correction applies.
"""
if not isinstance(input_schema, dict):
return None
properties = input_schema.get("properties")
if not isinstance(properties, dict) or not properties:
return None

top_level_keys = set(properties.keys())
extras = [key for key in arguments if key not in top_level_keys]
if not extras:
return None

candidates: list[tuple[str, dict[str, Any]]] = []
for name, prop in properties.items():
if not isinstance(prop, dict):
continue
if prop.get("type") != "object":
continue
if name in arguments:
continue
candidates.append((name, prop))

if len(candidates) != 1:
return None

candidate_name, candidate_schema = candidates[0]
candidate_properties = candidate_schema.get("properties")
if isinstance(candidate_properties, dict) and candidate_properties:
# The nested object declares its own properties; only re-nest when every
# extra is a declared inner property to avoid corrupting structured
# schemas where the extras may belong elsewhere.
allowed = set(candidate_properties.keys())
if not all(extra in allowed for extra in extras):
return None
elif candidate_schema.get("additionalProperties") is False:
return None

renested = {key: value for key, value in arguments.items() if key not in extras}
renested[candidate_name] = {key: arguments[key] for key in extras}
return renested

@classmethod
async def invoke_mcp_tool(
cls,
Expand Down Expand Up @@ -603,6 +658,10 @@ async def invoke_mcp_tool(
f"Invalid JSON input for tool {tool_name_for_display}: expected a JSON object"
)

renested = cls._renest_flattened_arguments(tool.inputSchema, json_data)
if renested is not None:
json_data = renested

if _debug.DONT_LOG_TOOL_DATA:
logger.debug(f"Invoking MCP tool {tool_name_for_display}")
else:
Expand Down
151 changes: 151 additions & 0 deletions tests/mcp/test_mcp_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,157 @@ async def test_invoke_mcp_tool():
# Just making sure it doesn't crash


def _set_properties_schema() -> dict[str, Any]:
return {
"type": "object",
"properties": {
"target": {"type": "string"},
"keysAndValues": {"type": "object"},
},
"required": ["target", "keysAndValues"],
}


@pytest.mark.asyncio
async def test_invoke_mcp_tool_renests_flattened_open_object():
"""Realtime models drop nested-object wrappers when strict mode is unavailable.

The OpenAI Realtime API has no `strict` field on `RealtimeFunctionTool`, so the
model receives the correct schema but may flatten arguments destined for an
open-ended object property. Re-nesting on the SDK side keeps the MCP call
intact for the unambiguous single-absorber case.
"""
schema = _set_properties_schema()
server = FakeMCPServer()
server.add_tool("SetProperties", schema)

ctx = RunContextWrapper(context=None)
tool = MCPTool(name="SetProperties", inputSchema=schema)

flattened = '{"target": "MyTarget", "visible": false, "color": "red"}'
await MCPUtil.invoke_mcp_tool(server, tool, ctx, flattened)

result = server.tool_results[-1]
prefix = f"result_{tool.name}_"
assert result.startswith(prefix)
args = json.loads(result[len(prefix) :])
assert args == {
"target": "MyTarget",
"keysAndValues": {"visible": False, "color": "red"},
}


@pytest.mark.asyncio
async def test_invoke_mcp_tool_preserves_already_nested_arguments():
schema = _set_properties_schema()
server = FakeMCPServer()
server.add_tool("SetProperties", schema)

ctx = RunContextWrapper(context=None)
tool = MCPTool(name="SetProperties", inputSchema=schema)

nested = '{"target": "MyTarget", "keysAndValues": {"visible": false}}'
await MCPUtil.invoke_mcp_tool(server, tool, ctx, nested)

result = server.tool_results[-1]
prefix = f"result_{tool.name}_"
args = json.loads(result[len(prefix) :])
assert args == {"target": "MyTarget", "keysAndValues": {"visible": False}}


@pytest.mark.asyncio
async def test_invoke_mcp_tool_skips_renesting_when_ambiguous():
"""Two missing object-typed siblings cannot be unambiguously absorbed."""
schema = {
"type": "object",
"properties": {
"target": {"type": "string"},
"keysAndValues": {"type": "object"},
"metadata": {"type": "object"},
},
"required": ["target"],
}
server = FakeMCPServer()
server.add_tool("Ambiguous", schema)

ctx = RunContextWrapper(context=None)
tool = MCPTool(name="Ambiguous", inputSchema=schema)

flattened = '{"target": "MyTarget", "visible": false}'
await MCPUtil.invoke_mcp_tool(server, tool, ctx, flattened)

result = server.tool_results[-1]
prefix = f"result_{tool.name}_"
args = json.loads(result[len(prefix) :])
# No re-nesting attempted; args reach the server unchanged.
assert args == {"target": "MyTarget", "visible": False}


@pytest.mark.asyncio
async def test_invoke_mcp_tool_skips_renesting_for_unknown_inner_keys():
"""Closed nested schemas should not absorb unknown extras."""
schema = {
"type": "object",
"properties": {
"target": {"type": "string"},
"keysAndValues": {
"type": "object",
"properties": {"visible": {"type": "boolean"}},
"additionalProperties": False,
},
},
"required": ["target", "keysAndValues"],
}
server = FakeMCPServer()
server.add_tool("Strict", schema)

ctx = RunContextWrapper(context=None)
tool = MCPTool(name="Strict", inputSchema=schema)

flattened = '{"target": "MyTarget", "visible": false, "unknown": 1}'
await MCPUtil.invoke_mcp_tool(server, tool, ctx, flattened)

result = server.tool_results[-1]
prefix = f"result_{tool.name}_"
args = json.loads(result[len(prefix) :])
assert args == {"target": "MyTarget", "visible": False, "unknown": 1}


@pytest.mark.asyncio
async def test_invoke_mcp_tool_renests_for_typed_inner_properties():
"""When extras match the nested object's declared properties, absorb them."""
schema = {
"type": "object",
"properties": {
"target": {"type": "string"},
"keysAndValues": {
"type": "object",
"properties": {
"visible": {"type": "boolean"},
"color": {"type": "string"},
},
},
},
"required": ["target", "keysAndValues"],
}
server = FakeMCPServer()
server.add_tool("Typed", schema)

ctx = RunContextWrapper(context=None)
tool = MCPTool(name="Typed", inputSchema=schema)

flattened = '{"target": "MyTarget", "visible": false, "color": "red"}'
await MCPUtil.invoke_mcp_tool(server, tool, ctx, flattened)

result = server.tool_results[-1]
prefix = f"result_{tool.name}_"
args = json.loads(result[len(prefix) :])
assert args == {
"target": "MyTarget",
"keysAndValues": {"visible": False, "color": "red"},
}


@pytest.mark.asyncio
async def test_mcp_meta_resolver_merges_and_passes():
captured: dict[str, Any] = {}
Expand Down