Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 119 additions & 10 deletions Server/src/transport/plugin_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing import Any, ClassVar

from starlette.endpoints import WebSocketEndpoint
from starlette.websockets import WebSocket
from starlette.websockets import WebSocket, WebSocketState

from core.config import config
from core.constants import API_KEY_HEADER
Expand Down Expand Up @@ -546,11 +546,102 @@ async def _get_connection(cls, session_id: str) -> WebSocket:
raise RuntimeError(f"Plugin session {session_id} not connected")
return websocket

@classmethod
async def _evict_connection(cls, session_id: str, reason: str) -> None:
"""Drop a stale session from in-memory maps and registry."""
lock = cls._lock
if lock is None:
return

websocket: WebSocket | None = None
ping_task: asyncio.Task | None = None
pending_futures: list[asyncio.Future] = []
async with lock:
websocket = cls._connections.pop(session_id, None)
ping_task = cls._ping_tasks.pop(session_id, None)
cls._last_pong.pop(session_id, None)
keys_to_remove: list[object] = []
for key, entry in list(cls._pending.items()):
if entry.get("session_id") == session_id:
future = entry.get("future")
if future and not future.done():
pending_futures.append(future)
keys_to_remove.append(key)
for key in keys_to_remove:
cls._pending.pop(key, None)

if ping_task is not None and not ping_task.done():
ping_task.cancel()

for future in pending_futures:
if not future.done():
future.set_exception(
PluginDisconnectedError(
f"Unity plugin session {session_id} disconnected while awaiting command_result"
)
)

if websocket is not None:
try:
await websocket.close(code=1001)
except Exception:
pass
Comment on lines +579 to +583
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add debug logging for the swallowed WebSocket close exception.

The except Exception: pass silently drops any close error, unlike the analogous block in _ping_loop (lines 509–511) which logs it. Given that _evict_connection is explicitly cleaning up a stale socket, even a debug-level log here would aid diagnostics.

🪵 Proposed fix
-        if websocket is not None:
-            try:
-                await websocket.close(code=1001)
-            except Exception:
-                pass
+        if websocket is not None:
+            try:
+                await websocket.close(code=1001)
+            except Exception as close_ex:
+                logger.debug("Error closing evicted WebSocket for session %s: %s", session_id, close_ex)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if websocket is not None:
try:
await websocket.close(code=1001)
except Exception:
pass
if websocket is not None:
try:
await websocket.close(code=1001)
except Exception as close_ex:
logger.debug("Error closing evicted WebSocket for session %s: %s", session_id, close_ex)
🧰 Tools
🪛 Ruff (0.15.1)

[error] 582-583: try-except-pass detected, consider logging the exception

(S110)


[warning] 582-582: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@Server/src/transport/plugin_hub.py` around lines 579 - 583, In
_evict_connection where you close the stale websocket (the websocket.close
call), replace the silent except with a debug log that mirrors the behavior in
_ping_loop: catch Exception as e and call the same logger used in _ping_loop to
emit a debug-level message including the exception (e) and context that the
close failed for the stale websocket; ensure you reference the websocket
variable in the log so diagnostics can tie the error to the connection being
evicted.


if cls._registry is not None:
try:
await cls._registry.unregister(session_id)
except Exception:
logger.debug(
"Failed to unregister evicted plugin session %s",
session_id,
exc_info=True,
)

logger.debug("Evicted plugin session %s (%s)", session_id, reason)

@classmethod
async def _ensure_live_connection(cls, session_id: str) -> bool:
"""Best-effort pre-send liveness check for a plugin WebSocket."""
try:
websocket = await cls._get_connection(session_id)
except RuntimeError:
await cls._evict_connection(session_id, "missing_websocket")
return False

if (
websocket.client_state == WebSocketState.CONNECTED
and websocket.application_state == WebSocketState.CONNECTED
):
return True

logger.debug(
"Detected stale plugin connection before send: session=%s app_state=%s client_state=%s",
session_id,
websocket.application_state,
websocket.client_state,
)
await cls._evict_connection(session_id, "stale_websocket_state")
return False

@staticmethod
def _unavailable_retry_response(reason: str = "no_unity_session") -> dict[str, Any]:
return MCPResponse(
success=False,
error="Unity session not available; please retry",
hint="retry",
data={"reason": reason, "retry_after_ms": 250},
).model_dump()

# ------------------------------------------------------------------
# Session resolution helpers
# ------------------------------------------------------------------
@classmethod
async def _resolve_session_id(cls, unity_instance: str | None, user_id: str | None = None) -> str:
async def _resolve_session_id(
cls,
unity_instance: str | None,
user_id: str | None = None,
retry_on_reload: bool = True,
) -> str:
"""Resolve a project hash (Unity instance id) to an active plugin session.

During Unity domain reloads the plugin's WebSocket session is torn down
Expand All @@ -561,6 +652,7 @@ async def _resolve_session_id(cls, unity_instance: str | None, user_id: str | No
Args:
unity_instance: Target instance (Name@hash or hash)
user_id: User ID from API key validation (for remote-hosted mode session isolation)
retry_on_reload: If False, do not wait for reconnects when no session is present.
"""
if cls._registry is None:
raise RuntimeError("Plugin registry not configured")
Expand Down Expand Up @@ -589,6 +681,8 @@ async def _resolve_session_id(cls, unity_instance: str | None, user_id: str | No
max_wait_s = 20.0
# Clamp to [0, 20] to prevent misconfiguration from causing excessive waits
max_wait_s = max(0.0, min(max_wait_s, 20.0))
if not retry_on_reload:
max_wait_s = 0.0
retry_ms = float(getattr(config, "reload_retry_ms", 250))
sleep_seconds = max(0.05, min(0.25, retry_ms / 1000.0))

Expand Down Expand Up @@ -684,6 +778,7 @@ async def send_command_for_instance(
command_type: str,
params: dict[str, Any],
user_id: str | None = None,
retry_on_reload: bool = True,
) -> dict[str, Any]:
"""Send a command to a Unity instance.

Expand All @@ -692,28 +787,42 @@ async def send_command_for_instance(
command_type: Command type to execute
params: Command parameters
user_id: User ID for session isolation in remote-hosted mode
retry_on_reload: If False, do not wait for session reconnect on reload.
"""
try:
session_id = await cls._resolve_session_id(unity_instance, user_id=user_id)
session_id = await cls._resolve_session_id(
unity_instance,
user_id=user_id,
retry_on_reload=retry_on_reload,
)
except NoUnitySessionError:
logger.debug(
"Unity session unavailable; returning retry: command=%s instance=%s",
command_type,
unity_instance or "default",
)
return MCPResponse(
success=False,
error="Unity session not available; please retry",
hint="retry",
data={"reason": "no_unity_session", "retry_after_ms": 250},
).model_dump()
return cls._unavailable_retry_response("no_unity_session")

if not await cls._ensure_live_connection(session_id):
if not retry_on_reload:
return cls._unavailable_retry_response("stale_connection")
try:
session_id = await cls._resolve_session_id(
unity_instance,
user_id=user_id,
retry_on_reload=True,
)
except NoUnitySessionError:
return cls._unavailable_retry_response("no_unity_session")
if not await cls._ensure_live_connection(session_id):
return cls._unavailable_retry_response("stale_connection")

# During domain reload / immediate reconnect windows, the plugin may be connected but not yet
# ready to process execute commands on the Unity main thread (which can be further delayed when
# the Unity Editor is unfocused). For fast-path commands, we do a bounded readiness probe using
# a main-thread ping command (handled by TransportCommandDispatcher) rather than waiting on
# register_tools (which can be delayed by EditorApplication.delayCall).
if command_type in cls._FAST_FAIL_COMMANDS and command_type != "ping":
if retry_on_reload and command_type in cls._FAST_FAIL_COMMANDS and command_type != "ping":
try:
max_wait_s = float(os.environ.get(
"UNITY_MCP_SESSION_READY_WAIT_SECONDS", "6"))
Expand Down
5 changes: 5 additions & 0 deletions Server/src/transport/unity_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,17 @@ async def send_with_unity_instance(
).model_dump()
)

retry_on_reload = kwargs.pop("retry_on_reload", True)
if not isinstance(retry_on_reload, bool):
retry_on_reload = True

try:
raw = await PluginHub.send_command_for_instance(
unity_instance,
command_type,
params,
user_id=user_id,
retry_on_reload=retry_on_reload,
)
return normalize_unity_response(raw)
except Exception as exc:
Expand Down
60 changes: 60 additions & 0 deletions Server/tests/integration/test_domain_reload_resilience.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,66 @@ async def mock_list_sessions(**kwargs):
PluginHub._lock = original_lock


@pytest.mark.asyncio
async def test_plugin_hub_no_wait_when_retry_disabled(monkeypatch):
"""retry_on_reload=False should skip reconnect wait loops."""
from transport.plugin_hub import PluginHub, NoUnitySessionError
from transport.plugin_registry import PluginRegistry

mock_registry = AsyncMock(spec=PluginRegistry)
mock_registry.get_session_id_by_hash = AsyncMock(return_value=None)
mock_registry.list_sessions = AsyncMock(return_value={})

original_registry = PluginHub._registry
original_lock = PluginHub._lock
PluginHub._registry = mock_registry
PluginHub._lock = asyncio.Lock()

monkeypatch.setenv("UNITY_MCP_SESSION_RESOLVE_MAX_WAIT_S", "20.0")

try:
with pytest.raises(NoUnitySessionError):
await PluginHub._resolve_session_id(
unity_instance="hash-missing",
retry_on_reload=False,
)

assert mock_registry.get_session_id_by_hash.await_count == 1
assert mock_registry.list_sessions.await_count == 1
finally:
PluginHub._registry = original_registry
PluginHub._lock = original_lock


@pytest.mark.asyncio
async def test_send_command_for_instance_fails_fast_on_stale_when_retry_disabled(monkeypatch):
"""Stale HTTP session should not send command when retry_on_reload is disabled."""
from transport.plugin_hub import PluginHub

resolve_mock = AsyncMock(return_value="sess-stale")
ensure_mock = AsyncMock(return_value=False)
send_mock = AsyncMock()

monkeypatch.setattr(PluginHub, "_resolve_session_id", resolve_mock)
monkeypatch.setattr(PluginHub, "_ensure_live_connection", ensure_mock)
monkeypatch.setattr(PluginHub, "send_command", send_mock)

result = await PluginHub.send_command_for_instance(
unity_instance="Project@hash-stale",
command_type="manage_script",
params={"action": "edit"},
retry_on_reload=False,
)

assert result["success"] is False
assert result["hint"] == "retry"
assert result.get("data", {}).get("reason") == "stale_connection"
assert resolve_mock.await_count == 1
_, resolve_kwargs = resolve_mock.await_args
assert resolve_kwargs.get("retry_on_reload") is False
send_mock.assert_not_awaited()


@pytest.mark.asyncio
async def test_read_console_during_simulated_reload(monkeypatch):
"""
Expand Down
33 changes: 33 additions & 0 deletions Server/tests/integration/test_transport_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,39 @@ async def _unused_send_fn(*_args, **_kwargs):
assert result["data"] == {"via": "http-remote"}


@pytest.mark.asyncio
async def test_http_forwards_retry_on_reload(monkeypatch):
"""HTTP transport should pass retry_on_reload through to PluginHub."""
monkeypatch.setattr(config, "transport_mode", "http")
monkeypatch.setattr(config, "http_remote_hosted", False)

captured: dict[str, object] = {}

async def fake_send_command_for_instance(_instance, _command, _params, **kwargs):
captured.update(kwargs)
return {"status": "success", "result": {"data": {"via": "http"}}}

monkeypatch.setattr(
unity_transport.PluginHub,
"send_command_for_instance",
fake_send_command_for_instance,
)

async def _unused_send_fn(*_args, **_kwargs):
raise AssertionError("send_fn should not be used in HTTP mode")

result = await unity_transport.send_with_unity_instance(
_unused_send_fn,
None,
"manage_script",
{"action": "edit"},
retry_on_reload=False,
)

assert result["success"] is True
assert captured.get("retry_on_reload") is False


@pytest.mark.asyncio
async def test_stdio_smoke(monkeypatch):
"""Stdio transport should call the legacy send fn with instance_id."""
Expand Down