|
12 | 12 | # limitations under the License. |
13 | 13 | # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= |
14 | 14 |
|
15 | | -import asyncio |
16 | | -import contextvars |
17 | 15 | import logging |
18 | 16 | import uuid |
19 | | -from threading import Lock |
20 | 17 | from typing import Any, Callable |
21 | 18 |
|
22 | 19 | from camel.messages import BaseMessage |
23 | 20 | from camel.models import ModelFactory |
24 | 21 | from camel.toolkits import FunctionTool, RegisteredAgentToolkit |
25 | 22 | from camel.types import ModelPlatformType |
26 | 23 |
|
| 24 | +from app.utils.event_loop_utils import _schedule_async_task |
27 | 25 | from app.agent.listen_chat_agent import ListenChatAgent, logger |
28 | 26 | from app.model.chat import AgentModelConfig, Chat |
29 | 27 | from app.service.task import ActionCreateAgentData, Agents, get_task_lock |
30 | 28 |
|
31 | | -# Thread-safe reference to main event loop using contextvars |
32 | | -# This ensures each request has its own event loop reference, |
33 | | -# avoiding race conditions |
34 | | -_main_event_loop_var: contextvars.ContextVar[asyncio.AbstractEventLoop |
35 | | - | None] = contextvars.ContextVar( |
36 | | - "_main_event_loop", |
37 | | - default=None |
38 | | - ) |
39 | | - |
40 | | -# Global fallback for main event loop reference |
41 | | -# Used when contextvars don't propagate to worker threads |
42 | | -# (e.g., asyncio.to_thread) |
43 | | -_GLOBAL_MAIN_LOOP: asyncio.AbstractEventLoop | None = None |
44 | | -_GLOBAL_MAIN_LOOP_LOCK = Lock() |
45 | | - |
46 | | - |
47 | | -def set_main_event_loop(loop: asyncio.AbstractEventLoop | None): |
48 | | - """Set the main event loop reference for thread-safe task scheduling. |
49 | | -
|
50 | | - This should be called from the main async context before spawning threads |
51 | | - that need to schedule async tasks. Uses both contextvars (for request |
52 | | - isolation) and a global fallback (for thread pool workers where |
53 | | - contextvars may not propagate). |
54 | | - """ |
55 | | - global _GLOBAL_MAIN_LOOP |
56 | | - _main_event_loop_var.set(loop) |
57 | | - with _GLOBAL_MAIN_LOOP_LOCK: |
58 | | - _GLOBAL_MAIN_LOOP = loop |
59 | | - |
60 | | - |
61 | | -def _schedule_async_task(coro): |
62 | | - """Schedule an async coroutine as a task, thread-safe. |
63 | | -
|
64 | | - This function handles scheduling from both the main event loop thread |
65 | | - and from worker threads (e.g., when using asyncio.to_thread). |
66 | | - """ |
67 | | - try: |
68 | | - # Try to get the running loop (works in main event loop thread) |
69 | | - loop = asyncio.get_running_loop() |
70 | | - loop.create_task(coro) |
71 | | - except RuntimeError: |
72 | | - # No running loop in this thread (we're in a worker thread) |
73 | | - # First try contextvars, then fallback to global reference |
74 | | - main_loop = _main_event_loop_var.get() |
75 | | - if main_loop is None: |
76 | | - with _GLOBAL_MAIN_LOOP_LOCK: |
77 | | - main_loop = _GLOBAL_MAIN_LOOP |
78 | | - if main_loop is not None and main_loop.is_running(): |
79 | | - asyncio.run_coroutine_threadsafe(coro, main_loop) |
80 | | - else: |
81 | | - # This should not happen in normal operation - log error and skip |
82 | | - logging.error( |
83 | | - "No event loop available for async task " |
84 | | - "scheduling, task skipped. Ensure " |
85 | | - "set_main_event_loop() is called " |
86 | | - "before parallel agent creation." |
87 | | - ) |
88 | | - |
89 | 29 |
|
90 | 30 | def agent_model( |
91 | 31 | agent_name: str, |
|
0 commit comments