diff --git a/agentrun/integration/builtin/sandbox.py b/agentrun/integration/builtin/sandbox.py index 0456799..fc0b328 100644 --- a/agentrun/integration/builtin/sandbox.py +++ b/agentrun/integration/builtin/sandbox.py @@ -26,6 +26,16 @@ class PlaywrightError(Exception): # type: ignore[no-redef] pass +try: + from greenlet import error as GreenletError +except ImportError: + + class GreenletError(Exception): # type: ignore[no-redef] + """Fallback greenlet error used when greenlet is not installed.""" + + pass + + class SandboxToolSet(CommonToolSet): """沙箱工具集基类 @@ -727,24 +737,47 @@ def __init__( polar_fs_config=polar_fs_config, ) self._playwright_sync: Optional["BrowserPlaywrightSync"] = None + self._playwright_thread: Optional[threading.Thread] = None def _get_playwright(self, sb: BrowserSandbox) -> "BrowserPlaywrightSync": """获取或创建 Playwright 连接 / Get or create Playwright connection 复用已有连接以减少连接建立开销和瞬态错误。 使用双重检查锁定避免并发调用时创建多个连接导致资源泄漏。 + 当创建连接的线程已退出时,自动重建连接(Playwright greenlet 绑定到创建它的线程)。 + Reuses existing connection to reduce connection overhead and transient errors. Uses double-checked locking to avoid leaking connections under concurrent calls. + Automatically recreates the connection when the thread that created it has exited, + because Playwright's internal greenlet is bound to the thread that created it. """ - if self._playwright_sync is not None: - return self._playwright_sync + if self._playwright_sync is not None and self._playwright_thread is not None: + current_thread = threading.current_thread() + creator_thread = self._playwright_thread + if not creator_thread.is_alive() or current_thread is not creator_thread: + if not creator_thread.is_alive(): + logger.debug( + "Playwright creating thread (id=%s) has exited, recreating" + " connection", + creator_thread.ident, + ) + else: + logger.debug( + "Playwright creating thread (id=%s) differs from current" + " thread (id=%s), recreating connection", + creator_thread.ident, + current_thread.ident, + ) + self._reset_playwright() - with self.lock: - if self._playwright_sync is None: - playwright_sync = sb.sync_playwright() - playwright_sync.open() - self._playwright_sync = playwright_sync - return self._playwright_sync + if self._playwright_sync is None: + with self.lock: + if self._playwright_sync is None: + playwright_sync = sb.sync_playwright() + playwright_sync.open() + self._playwright_sync = playwright_sync + self._playwright_thread = threading.current_thread() + return self._playwright_sync def _reset_playwright(self) -> None: """重置 Playwright 连接 / Reset Playwright connection @@ -763,6 +796,7 @@ def _reset_playwright(self) -> None: exc_info=True, ) self._playwright_sync = None + self._playwright_thread = None def _run_in_sandbox(self, callback: Callable[[Sandbox], Any]) -> Any: """在沙箱中执行操作,智能区分错误类型 / Execute in sandbox with smart error handling @@ -812,6 +846,22 @@ def _run_in_sandbox(self, callback: Callable[[Sandbox], Any]) -> Any: "Browser tool-level error (no sandbox rebuild): %s", e ) return {"error": f"{e!s}"} + except GreenletError as e: + logger.debug( + "Greenlet thread-binding error, resetting Playwright: %s", + e, + ) + # Keep the existing sandbox (it is still healthy); only the + # Playwright connection needs to be recreated on this thread. + try: + self._reset_playwright() + return callback(sb) + except Exception as e2: + logger.debug( + "Retry after Playwright reset failed: %s", + e2, + ) + return {"error": f"{e!s}"} except Exception as e: logger.debug("Unexpected error in browser sandbox: %s", e) return {"error": f"{e!s}"} @@ -881,7 +931,7 @@ def inner(sb: Sandbox): def browser_navigate( self, url: str, - wait_until: str = "load", + wait_until: str = "domcontentloaded", timeout: Optional[float] = None, ) -> Dict[str, Any]: """导航到 URL / Navigate to URL""" diff --git a/tests/unittests/integration/langchain/test_agent_invoke_methods.py b/tests/unittests/integration/langchain/test_agent_invoke_methods_unittests.py similarity index 100% rename from tests/unittests/integration/langchain/test_agent_invoke_methods.py rename to tests/unittests/integration/langchain/test_agent_invoke_methods_unittests.py diff --git a/tests/unittests/integration/test_agentscope.py b/tests/unittests/integration/test_agentscope.py index cdcd6e0..022d2dd 100644 --- a/tests/unittests/integration/test_agentscope.py +++ b/tests/unittests/integration/test_agentscope.py @@ -21,7 +21,7 @@ from .scenarios import Scenarios -class TestToolSet(CommonToolSet): +class SampleToolSet(CommonToolSet): """测试用工具集""" def __init__(self, timezone: str = "UTC"): @@ -150,9 +150,9 @@ def mocked_model( return model("mock-model") @pytest.fixture - def mocked_toolset(self) -> TestToolSet: + def mocked_toolset(self) -> SampleToolSet: """创建 mock 的工具集""" - return TestToolSet(timezone="UTC") + return SampleToolSet(timezone="UTC") # ========================================================================= # 测试:简单对话(无工具调用) @@ -194,7 +194,7 @@ async def test_multi_tool_calls( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试多工具同时调用""" # 使用默认的多工具场景 @@ -223,7 +223,7 @@ async def test_stream_options_validation( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试 stream_options 在请求中的正确性""" # 使用默认场景 diff --git a/tests/unittests/integration/test_browser_toolset_error_handling.py b/tests/unittests/integration/test_browser_toolset_error_handling.py index fb1ea71..a0e3924 100644 --- a/tests/unittests/integration/test_browser_toolset_error_handling.py +++ b/tests/unittests/integration/test_browser_toolset_error_handling.py @@ -91,6 +91,7 @@ def toolset(self, mock_sandbox): with patch.object(BrowserToolSet, "__init__", lambda self: None): ts = BrowserToolSet() ts._playwright_sync = None + ts._playwright_thread = None ts.sandbox = mock_sandbox ts.sandbox_id = "test-sandbox-id" ts.lock = MagicMock() @@ -218,6 +219,7 @@ def toolset(self, mock_sandbox): with patch.object(BrowserToolSet, "__init__", lambda self: None): ts = BrowserToolSet() ts._playwright_sync = None + ts._playwright_thread = None ts.sandbox = mock_sandbox ts.sandbox_id = "test-sandbox-id" ts.lock = threading.Lock() @@ -252,15 +254,19 @@ def test_reset_playwright_handles_close_error(self, toolset, mock_sandbox): assert toolset._playwright_sync is None - def test_concurrent_get_playwright_creates_only_one_connection( + def test_concurrent_get_playwright_each_thread_gets_own_connection( self, toolset, mock_sandbox ): - """测试并发调用 _get_playwright 只创建一个连接,不会泄漏""" - barrier = threading.Barrier(5) + """测试并发调用 _get_playwright 时每个线程各自创建连接 + + Playwright Sync API 的 greenlet 绑定到创建它的 OS 线程, + 不能跨线程共享。每个工作线程必须创建自己的连接。 + """ + start_barrier = threading.Barrier(5) results: list = [] def worker(): - barrier.wait() + start_barrier.wait() p = toolset._get_playwright(mock_sandbox) results.append(p) @@ -270,9 +276,8 @@ def worker(): for t in threads: t.join() + # Every thread must have received a connection assert len(results) == 5 - assert all(p is results[0] for p in results) - mock_sandbox.sync_playwright.assert_called_once() class TestBrowserToolSetClose: @@ -289,6 +294,7 @@ def toolset(self, mock_sandbox): with patch.object(BrowserToolSet, "__init__", lambda self: None): ts = BrowserToolSet() ts._playwright_sync = MagicMock() + ts._playwright_thread = threading.current_thread() ts.sandbox = mock_sandbox ts.sandbox_id = "test-sandbox-id" ts.lock = threading.Lock() @@ -307,3 +313,210 @@ def test_close_cleans_up_playwright_and_sandbox( mock_sandbox.stop.assert_called_once() assert toolset.sandbox is None assert toolset.sandbox_id == "" + + +class TestBrowserToolSetThreadAwareness: + """测试 _get_playwright 的线程感知行为 / Tests for thread-aware Playwright caching""" + + @pytest.fixture + def mock_sandbox(self): + """创建模拟的沙箱""" + sb = MagicMock() + sb.sync_playwright.return_value = MagicMock() + return sb + + @pytest.fixture + def toolset(self, mock_sandbox): + """创建带有模拟沙箱的 BrowserToolSet 实例""" + with patch.object(BrowserToolSet, "__init__", lambda self: None): + ts = BrowserToolSet() + ts._playwright_sync = None + ts._playwright_thread = None + ts.sandbox = mock_sandbox + ts.sandbox_id = "test-sandbox-id" + ts.lock = threading.Lock() + return ts + + def test_get_playwright_records_creating_thread( + self, toolset, mock_sandbox + ): + """测试 _get_playwright 记录创建连接的线程""" + toolset._get_playwright(mock_sandbox) + + assert toolset._playwright_thread is threading.current_thread() + + def test_get_playwright_same_thread_reuses_connection( + self, toolset, mock_sandbox + ): + """测试同一线程多次调用复用连接""" + p1 = toolset._get_playwright(mock_sandbox) + p2 = toolset._get_playwright(mock_sandbox) + + assert p1 is p2 + mock_sandbox.sync_playwright.assert_called_once() + + def test_get_playwright_dead_thread_recreates_connection( + self, toolset, mock_sandbox + ): + """测试创建线程退出后重建 Playwright 连接(Bug 1 修复) + + 模拟 LangGraph ToolNode 的行为:每次工具调用在不同的线程上执行。 + 当创建连接的工作线程退出后,缓存的 Playwright 实例必须重建, + 因为 Playwright 内部 greenlet 绑定到创建它的线程。 + """ + first_instance: list = [] + second_instance: list = [] + + def first_call(): + p = toolset._get_playwright(mock_sandbox) + first_instance.append(p) + + t1 = threading.Thread(target=first_call) + t1.start() + t1.join() + # t1 has now exited — its greenlet binding is dead + + def second_call(): + p = toolset._get_playwright(mock_sandbox) + second_instance.append(p) + + t2 = threading.Thread(target=second_call) + t2.start() + t2.join() + + assert len(first_instance) == 1 + assert len(second_instance) == 1 + # A new connection must have been created for the second call + assert mock_sandbox.sync_playwright.call_count == 2 + + def test_get_playwright_different_live_thread_recreates_connection( + self, toolset, mock_sandbox + ): + """测试从不同线程调用时,即使创建线程仍存活也会重建连接 + + Playwright Sync API 的 greenlet 绑定到创建它的 OS 线程, + 即使创建线程仍存活,在另一个线程上调用也不安全。 + 每个调用线程必须获得自己的连接。 + """ + results: list = [] + + # Create connection in main thread first + toolset._get_playwright(mock_sandbox) + # The creating thread (main test thread) is still alive + + # A different thread must receive its own new connection + def worker(): + p = toolset._get_playwright(mock_sandbox) + results.append(p) + + t = threading.Thread(target=worker) + t.start() + t.join() + + assert len(results) == 1 + # A new connection must have been created for the worker thread + assert mock_sandbox.sync_playwright.call_count == 2 + + def test_reset_playwright_clears_thread(self, toolset, mock_sandbox): + """测试 _reset_playwright 清理线程引用""" + toolset._get_playwright(mock_sandbox) + assert toolset._playwright_thread is not None + + toolset._reset_playwright() + + assert toolset._playwright_thread is None + assert toolset._playwright_sync is None + + +class TestBrowserToolSetGreenletErrorHandling: + """测试 _run_in_sandbox 对 greenlet 死亡错误的处理(Bug 3 修复)""" + + @pytest.fixture + def mock_sandbox(self): + """创建模拟的沙箱""" + return MagicMock() + + @pytest.fixture + def toolset(self, mock_sandbox): + """创建带有模拟沙箱的 BrowserToolSet 实例""" + with patch.object(BrowserToolSet, "__init__", lambda self: None): + ts = BrowserToolSet() + ts._playwright_sync = None + ts._playwright_thread = None + ts.sandbox = mock_sandbox + ts.sandbox_id = "test-sandbox-id" + ts.lock = MagicMock() + ts._reset_playwright = MagicMock() + ts._ensure_sandbox = MagicMock(return_value=mock_sandbox) + return ts + + def test_greenlet_error_resets_playwright_keeps_sandbox_and_retries( + self, toolset, mock_sandbox + ): + """測試 greenlet.error 触发 Playwright 重置、保留沙箱并重试 + + 当 greenlet.error 发生时,沙箱本身仍然健康(这是客户端线程亲和性问题), + 只需重置 Playwright 连接并在当前线程重试,不应销毁沙箱。 + """ + try: + from greenlet import error as GreenletError + except ImportError: + pytest.skip("greenlet not installed") + + call_count = 0 + + def callback(sb): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise GreenletError( + "cannot switch to a different thread (which happens to have" + " exited)" + ) + return {"success": True} + + result = toolset._run_in_sandbox(callback) + + assert result == {"success": True} + assert call_count == 2 + toolset._reset_playwright.assert_called_once() + # Sandbox must be preserved — the error is client-side thread affinity, + # not a sandbox crash. + assert toolset.sandbox is mock_sandbox + + def test_greenlet_error_returns_error_if_retry_fails( + self, toolset, mock_sandbox + ): + """测试 greenlet.error 重试失败时返回错误字典""" + try: + from greenlet import error as GreenletError + except ImportError: + pytest.skip("greenlet not installed") + + def callback(sb): + raise GreenletError( + "cannot switch to a different thread (which happens to have" + " exited)" + ) + + result = toolset._run_in_sandbox(callback) + + assert "error" in result + toolset._reset_playwright.assert_called_once() + # Sandbox still preserved even after retry failure + assert toolset.sandbox is mock_sandbox + + def test_non_greenlet_unexpected_error_does_not_reset( + self, toolset, mock_sandbox + ): + """测试普通未知错误不触发 Playwright 重置""" + original_sandbox = toolset.sandbox + + def callback(sb): + raise ValueError("Some other unexpected error") + + result = toolset._run_in_sandbox(callback) + + assert "error" in result + toolset._reset_playwright.assert_not_called() + assert toolset.sandbox is original_sandbox diff --git a/tests/unittests/integration/test_crewai.py b/tests/unittests/integration/test_crewai.py index e004fc6..d59e89f 100644 --- a/tests/unittests/integration/test_crewai.py +++ b/tests/unittests/integration/test_crewai.py @@ -23,7 +23,7 @@ from .scenarios import Scenarios -class TestToolSet(CommonToolSet): +class SampleToolSet(CommonToolSet): """测试用工具集""" def __init__(self, timezone: str = "UTC"): @@ -149,9 +149,9 @@ def mocked_model( return model("mock-model") @pytest.fixture - def mocked_toolset(self) -> TestToolSet: + def mocked_toolset(self) -> SampleToolSet: """创建 mock 的工具集""" - return TestToolSet(timezone="UTC") + return SampleToolSet(timezone="UTC") # ========================================================================= # 测试:简单对话(无工具调用) @@ -191,7 +191,7 @@ def test_multi_tool_calls( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试多工具同时调用 @@ -210,7 +210,7 @@ def test_stream_options_validation( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试 stream_options 在请求中的正确性 diff --git a/tests/unittests/integration/test_google_adk.py b/tests/unittests/integration/test_google_adk.py index 50287f2..801985a 100644 --- a/tests/unittests/integration/test_google_adk.py +++ b/tests/unittests/integration/test_google_adk.py @@ -21,7 +21,7 @@ from .scenarios import Scenarios -class TestToolSet(CommonToolSet): +class SampleToolSet(CommonToolSet): """测试用工具集""" def __init__(self, timezone: str = "UTC"): @@ -208,9 +208,9 @@ def mocked_model( return model("mock-model") @pytest.fixture - def mocked_toolset(self) -> TestToolSet: + def mocked_toolset(self) -> SampleToolSet: """创建 mock 的工具集""" - return TestToolSet(timezone="UTC") + return SampleToolSet(timezone="UTC") # ========================================================================= # 测试:简单对话(无工具调用) @@ -252,7 +252,7 @@ async def test_single_tool_call( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试单次工具调用""" # 配置场景 @@ -284,7 +284,7 @@ async def test_multi_tool_calls( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试多工具同时调用""" # 使用默认的多工具场景 @@ -315,7 +315,7 @@ async def test_stream_options_validation( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试 stream_options 在请求中的正确性""" # 使用默认场景 diff --git a/tests/unittests/integration/test_langchain.py b/tests/unittests/integration/test_langchain.py index 4587e42..712842a 100644 --- a/tests/unittests/integration/test_langchain.py +++ b/tests/unittests/integration/test_langchain.py @@ -22,7 +22,7 @@ from .scenarios import Scenarios -class TestToolSet(CommonToolSet): +class SampleToolSet(CommonToolSet): """测试用工具集""" def __init__(self, timezone: str = "UTC"): @@ -201,9 +201,9 @@ def mocked_model( return model("mock-model") @pytest.fixture - def mocked_toolset(self) -> TestToolSet: + def mocked_toolset(self) -> SampleToolSet: """创建 mock 的工具集""" - return TestToolSet(timezone="UTC") + return SampleToolSet(timezone="UTC") # ========================================================================= # 测试:简单对话(无工具调用) @@ -244,7 +244,7 @@ def test_single_tool_call( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试单次工具调用""" # 配置场景 @@ -276,7 +276,7 @@ def test_multi_tool_calls( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试多工具同时调用""" # 使用默认的多工具场景 @@ -307,7 +307,7 @@ def test_stream_options_in_requests( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试请求中的 stream_options 设置""" from langchain_openai import ChatOpenAI @@ -324,7 +324,7 @@ def test_stream_options_validation( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试 stream_options 在请求中的正确性""" # 使用默认场景 @@ -370,7 +370,7 @@ async def test_async_invoke( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试异步调用""" # 使用默认场景 diff --git a/tests/unittests/integration/test_langgraph.py b/tests/unittests/integration/test_langgraph.py index d56e697..3bc4339 100644 --- a/tests/unittests/integration/test_langgraph.py +++ b/tests/unittests/integration/test_langgraph.py @@ -22,7 +22,7 @@ from .scenarios import Scenarios -class TestToolSet(CommonToolSet): +class SampleToolSet(CommonToolSet): """测试用工具集""" def __init__(self, timezone: str = "UTC"): @@ -233,9 +233,9 @@ def mocked_model( return model("mock-model") @pytest.fixture - def mocked_toolset(self) -> TestToolSet: + def mocked_toolset(self) -> SampleToolSet: """创建 mock 的工具集""" - return TestToolSet(timezone="UTC") + return SampleToolSet(timezone="UTC") # ========================================================================= # 测试:简单对话(无工具调用) @@ -275,7 +275,7 @@ def test_single_tool_call( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试单次工具调用""" # 配置场景 @@ -306,7 +306,7 @@ def test_multi_tool_calls( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试多工具同时调用""" # 使用默认的多工具场景 @@ -336,7 +336,7 @@ def test_stream_options_validation( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试 stream_options 在请求中的正确性""" # 使用默认场景 @@ -377,7 +377,7 @@ async def test_async_invoke( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试异步调用""" # 使用默认场景 diff --git a/tests/unittests/integration/test_pydanticai.py b/tests/unittests/integration/test_pydanticai.py index 2a5c713..9bea4b9 100644 --- a/tests/unittests/integration/test_pydanticai.py +++ b/tests/unittests/integration/test_pydanticai.py @@ -22,7 +22,7 @@ from .scenarios import Scenarios -class TestToolSet(CommonToolSet): +class SampleToolSet(CommonToolSet): """测试用工具集""" def __init__(self, timezone: str = "UTC"): @@ -203,9 +203,9 @@ def mocked_model( return model("mock-model") @pytest.fixture - def mocked_toolset(self) -> TestToolSet: + def mocked_toolset(self) -> SampleToolSet: """创建 mock 的工具集""" - return TestToolSet(timezone="UTC") + return SampleToolSet(timezone="UTC") # ========================================================================= # 测试:简单对话(无工具调用) @@ -245,7 +245,7 @@ def test_single_tool_call( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试单次工具调用""" # 配置场景 @@ -276,7 +276,7 @@ def test_multi_tool_calls( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试多工具同时调用""" # 使用默认的多工具场景 @@ -305,7 +305,7 @@ def test_stream_options_validation( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试 stream_options 在请求中的正确性 @@ -349,7 +349,7 @@ async def test_async_invoke( self, mock_server: MockLLMServer, mocked_model: CommonModel, - mocked_toolset: TestToolSet, + mocked_toolset: SampleToolSet, ): """测试异步调用""" # 使用默认场景