diff --git a/.env.example b/.env.example index 7d078aa91..7317369ea 100644 --- a/.env.example +++ b/.env.example @@ -25,6 +25,9 @@ FEISHU_REDIRECT_URI=http://localhost:3000/auth/feishu/callback # Without a key, the tools still work but with lower rate limits JINA_API_KEY= +# Exa API key (for exa_search tool and web_search Exa engine — get one at https://exa.ai) +EXA_API_KEY= + # Public app URL used in user-facing links, such as password reset emails. # Leave empty for auto-discovery from the browser request. # Set explicitly for production (e.g. https://your-domain.com) — required for diff --git a/README.md b/README.md index b2436faf9..c46f8878c 100644 --- a/README.md +++ b/README.md @@ -113,7 +113,7 @@ bash restart.sh git clone https://github.com/dataelement/Clawith.git cd Clawith && cp .env.example .env docker compose up -d -# → http://localhost:3000 +# → http://localhost:3008 ``` **To update an existing deployment:** diff --git a/README_zh-CN.md b/README_zh-CN.md index 5c5fbdc1f..4ffaf07d1 100644 --- a/README_zh-CN.md +++ b/README_zh-CN.md @@ -108,7 +108,7 @@ bash restart.sh git clone https://github.com/dataelement/Clawith.git cd Clawith && cp .env.example .env docker compose up -d -# → http://localhost:3000 +# → http://localhost:3008 ``` **更新已有部署:** diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 07f5e923a..98a4ba44a 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,48 +1,53 @@ -# v1.8.2 Release Notes +# v1.8.3-beta — A2A Async Communication, Image Context & Search Tools ## What's New -### Security -- **Fix account takeover via username collision** (#300): Prevents an attacker from creating an account with a username matching an existing SSO user's email, which could lead to unauthorized account access. -- **Fix duplicate user creation on repeated SSO logins**: Feishu and DingTalk SSO now correctly reuse existing accounts instead of creating duplicate users. - -### AgentBay — Cloud Computer & Browser Automation -- **New: `agentbay_file_transfer` tool**: Transfer files between any two environments — agent workspace, browser sandbox, cloud desktop (computer), or code sandbox — in any direction. -- **Fix: Computer Take Control (TC) white screen**: TC now connects to the correct environment session (computer vs. browser) based on `env_type`. Previously, an existing browser session could hijack the computer TC connection. -- **Fix: OS-aware desktop paths**: The `agentbay_file_transfer` tool description now automatically reflects the correct paths for the agent's configured OS type: - - Windows: `C:\Users\Administrator\Desktop\` - - Linux: `/home/wuying/Desktop/` -- **Fix: Desktop file refresh**: After uploading to the Linux desktop directory, GNOME is notified to refresh icon display. -- Multiple Take Control stability fixes: CDP polling replaced with sleep, multi-tab cleanup, 40s navigate timeout, unhashable type errors. - -### Feishu (Lark) — CardKit Streaming Cards -- Feishu bot responses now stream as animated typing-effect cards using the CardKit API (#287). -- Fixed SSE stream hang issues and websocket proxy bypass for system proxy conflicts. - -### DingTalk & Organization Sync -- Fixed DingTalk org sync permissions guide (`Contact.User.Read` scope). -- Fixed `open_id` vs `employee_id` user type handling in Feishu org sync. - -### Other Bug Fixes -- **Fix: SSE stream protection** — `finish_reason` break guard added for OpenAI and Gemini streams to prevent runaway streams. -- **Fix: Duplicate tool `send_feishu_message`** — Removed duplicate DB entry; added dedup guard in tool loading to prevent `Tool names must be unique` LLM errors. -- **Fix: JWT token not consumed** on reset-password and verify-email routes. -- **Fix: NULL username/email** for SSO-created users in `list_users`. -- **Fix: Company name slug generation** — Added `anyascii` + `pypinyin` for universal CJK/Latin transliteration. -- **Fix: `publish_page` URL** — Correctly generates `try.clawith.ai` links on source deployments. -- **Fix: Agent template directory** — Dynamic default for source deployments. -- Various i18n fixes (TakeControlPanel, DingTalk guide). +### Agent-to-Agent (A2A) Async Communication — Beta +- **Three communication modes** for `send_message_to_agent`: + - `notify` — fire-and-forget, one-way announcement + - `task_delegate` — delegate work and get results back asynchronously via `on_message` trigger + - `consult` — synchronous question-reply (original behaviour) +- **Feature flag**: controlled at the tenant level via Company Settings → Company Info → A2A Async toggle (default: **OFF**) +- When disabled, the `msg_type` parameter is **hidden from the LLM** so agents only see synchronous consult mode +- Security: chain depth protection (max 3 hops), regex filtering of internal terms, SQL injection prevention +- Performance: async wake sessions limited to 2 tool rounds + +### Multimodal Image Context +- Base64 image markers are now persisted to the database at write time +- Chat UI correctly strips `[image_data:]` markers and renders thumbnails +- Fixed chat page vertical scrolling (flexbox `min-height: 0` constraint) +- Removed deprecated `/agents/:id/chat` route + +### Search Engine Tools +- New `Exa Search` tool — AI-powered semantic search with category filtering +- New standalone search engine tools: DuckDuckGo, Tavily, Google, Bing (each as own tool) + +### UI Improvements +- Drag-and-drop file upload across the application +- Chat sidebar polish: segment control, session items styling +- Agent-to-agent sessions now visible in the admin "Other Users" tab + +### Bug Fixes +- DingTalk org sync rate limiting to prevent API throttling +- Tool seeder: `parameters_schema` now correctly included in new tool INSERT +- Unified `msg_type` enum references across codebase +- Docker access port corrected to 3008 --- ## Upgrade Guide -> **No database migrations required.** No new environment variables. +> **Database migration required.** Run `alembic upgrade heads` to add the `a2a_async_enabled` column. ### Docker Deployment (Recommended) ```bash git pull origin main + +# Run database migration +docker exec clawith-backend-1 alembic upgrade heads + +# Rebuild and restart docker compose down && docker compose up -d --build ``` @@ -51,8 +56,8 @@ docker compose down && docker compose up -d --build ```bash git pull origin main -# Install new Python dependency -pip install anyascii>=0.3.2 +# Run database migration +alembic upgrade heads # Rebuild frontend cd frontend && npm install && npm run build @@ -61,23 +66,13 @@ cd .. # Restart services ``` -### nginx Update Required - -A new routing rule has been added to `nginx.conf`. If you manage nginx separately (not via Docker), add this block inside your `server {}` before the WebSocket proxy section: - -```nginx -location ~ ^/WW_verify_[A-Za-z0-9]+\.txt$ { - proxy_pass http://backend:8000/api/wecom-verify$request_uri; - proxy_set_header Host $http_host; - proxy_set_header X-Real-IP $remote_addr; -} -``` - ### Kubernetes (Helm) ```bash helm upgrade clawith helm/clawith/ -f values.yaml +# Run migration job for a2a_async_enabled column ``` -No migration job needed. - +### Notes +- The A2A Async feature is **disabled by default**. No behaviour changes until explicitly enabled. +- The `a2a_async_enabled` column defaults to `FALSE`, so existing tenants are unaffected. diff --git a/backend/VERSION b/backend/VERSION index 53adb84c8..8f1278c4f 100644 --- a/backend/VERSION +++ b/backend/VERSION @@ -1 +1 @@ -1.8.2 +1.8.3-beta diff --git a/backend/alembic/versions/add_a2a_async_enabled.py b/backend/alembic/versions/add_a2a_async_enabled.py new file mode 100644 index 000000000..dc7d69f07 --- /dev/null +++ b/backend/alembic/versions/add_a2a_async_enabled.py @@ -0,0 +1,24 @@ +"""Add a2a_async_enabled column to tenants table. + +Revision ID: f1a2b3c4d5e6 +Revises: d9cbd43b62e5 +Create Date: 2026-04-10 02:50:00.000000 +""" +from alembic import op + + +revision = "f1a2b3c4d5e6" +down_revision = "d9cbd43b62e5" + + +def upgrade() -> None: + op.execute( + "ALTER TABLE agents DROP COLUMN IF EXISTS a2a_async_enabled" + ) + op.execute( + "ALTER TABLE tenants ADD COLUMN IF NOT EXISTS a2a_async_enabled BOOLEAN DEFAULT FALSE" + ) + + +def downgrade() -> None: + op.execute("ALTER TABLE tenants DROP COLUMN IF EXISTS a2a_async_enabled") diff --git a/backend/app/api/dingtalk.py b/backend/app/api/dingtalk.py index 9a431ecf2..41ac2cd61 100644 --- a/backend/app/api/dingtalk.py +++ b/backend/app/api/dingtalk.py @@ -57,7 +57,7 @@ async def configure_dingtalk_channel( existing.is_configured = True existing.extra_config = {**existing.extra_config, "connection_mode": conn_mode, "agent_id": dingtalk_agent_id} await db.flush() - + # Restart Stream client if in websocket mode if conn_mode == "websocket": from app.services.dingtalk_stream import dingtalk_stream_manager @@ -68,7 +68,7 @@ async def configure_dingtalk_channel( from app.services.dingtalk_stream import dingtalk_stream_manager import asyncio asyncio.create_task(dingtalk_stream_manager.stop_client(agent_id)) - + return ChannelConfigOut.model_validate(existing) config = ChannelConfig( @@ -145,8 +145,19 @@ async def process_dingtalk_message( conversation_id: str, conversation_type: str, session_webhook: str, + image_base64_list: list[str] | None = None, + saved_file_paths: list[str] | None = None, + sender_nick: str = "", + message_id: str = "", ): - """Process an incoming DingTalk bot message and reply via session webhook.""" + """Process an incoming DingTalk bot message and reply via session webhook. + + Args: + image_base64_list: List of base64-encoded image data URIs for vision LLM. + saved_file_paths: List of local file paths where media files were saved. + sender_nick: Display name of the sender from DingTalk. + message_id: DingTalk message ID (used for reactions). + """ import json import httpx from datetime import datetime, timezone @@ -207,21 +218,146 @@ async def process_dingtalk_message( ) history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())] + # Build saved_content for DB (no base64 blobs, keep it display-friendly) + import re as _re_dt + _clean_text = _re_dt.sub( + r'\[image_data:data:image/[^;]+;base64,[A-Za-z0-9+/=]+\]', + "", user_text, + ).strip() + if saved_file_paths: + from pathlib import Path as _PathDT + _file_prefixes = "\n".join( + f"[file:{_PathDT(p).name}]" for p in saved_file_paths + ) + saved_content = f"{_file_prefixes}\n{_clean_text}".strip() if _clean_text else _file_prefixes + else: + saved_content = _clean_text or user_text + # Save user message db.add(ChatMessage( agent_id=agent_id, user_id=platform_user_id, - role="user", content=user_text, + role="user", content=saved_content, conversation_id=session_conv_id, )) sess.last_message_at = datetime.now(timezone.utc) await db.commit() + # Build LLM input text: for images, inject base64 markers so vision models can see them + llm_user_text = user_text + if image_base64_list: + image_markers = "\n".join( + f"[image_data:{uri}]" for uri in image_base64_list + ) + llm_user_text = f"{user_text}\n{image_markers}" if user_text else image_markers + + # ── Set up channel_file_sender so the agent can send files via DingTalk ── + from app.services.agent_tools import channel_file_sender as _cfs + from app.services.dingtalk_stream import ( + _upload_dingtalk_media, + _send_dingtalk_media_message, + ) + + # Load DingTalk credentials from ChannelConfig + _dt_cfg_r = await db.execute( + _select(ChannelConfig).where( + ChannelConfig.agent_id == agent_id, + ChannelConfig.channel_type == "dingtalk", + ) + ) + _dt_cfg = _dt_cfg_r.scalar_one_or_none() + _dt_app_key = _dt_cfg.app_id if _dt_cfg else None + _dt_app_secret = _dt_cfg.app_secret if _dt_cfg else None + + _cfs_token = None + if _dt_app_key and _dt_app_secret: + # Determine send target: group -> conversation_id, P2P -> sender_staff_id + _dt_target_id = conversation_id if conversation_type == "2" else sender_staff_id + _dt_conv_type = conversation_type + + async def _dingtalk_file_sender(file_path: str, msg: str = ""): + """Send a file/image/video via DingTalk proactive message API.""" + from pathlib import Path as _P + + _fp = _P(file_path) + _ext = _fp.suffix.lower() + + # Determine media type from extension + if _ext in (".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"): + _media_type = "image" + elif _ext in (".mp4", ".mov", ".avi", ".mkv"): + _media_type = "video" + elif _ext in (".mp3", ".wav", ".ogg", ".amr", ".m4a"): + _media_type = "voice" + else: + _media_type = "file" + + # Upload media to DingTalk + _mid = await _upload_dingtalk_media( + _dt_app_key, _dt_app_secret, file_path, _media_type + ) + + if _mid: + # Send via proactive message API + _ok = await _send_dingtalk_media_message( + _dt_app_key, _dt_app_secret, + _dt_target_id, _mid, _media_type, + _dt_conv_type, filename=_fp.name, + ) + if _ok: + # Also send accompany text if provided + if msg: + try: + async with httpx.AsyncClient(timeout=10) as _cl: + await _cl.post(session_webhook, json={ + "msgtype": "text", + "text": {"content": msg}, + }) + except Exception: + pass + return + + # Fallback: send a text message with file info + _fallback_parts = [] + if msg: + _fallback_parts.append(msg) + _fallback_parts.append(f"[File: {_fp.name}]") + try: + async with httpx.AsyncClient(timeout=10) as _cl: + await _cl.post(session_webhook, json={ + "msgtype": "text", + "text": {"content": "\n\n".join(_fallback_parts)}, + }) + except Exception as _fb_err: + logger.error(f"[DingTalk] Fallback file text also failed: {_fb_err}") + + _cfs_token = _cfs.set(_dingtalk_file_sender) + # Call LLM - reply_text = await _call_agent_llm( - db, agent_id, user_text, - history=history, user_id=platform_user_id, + try: + reply_text = await _call_agent_llm( + db, agent_id, llm_user_text, + history=history, user_id=platform_user_id, + ) + finally: + # Reset ContextVar + if _cfs_token is not None: + _cfs.reset(_cfs_token) + # Recall thinking reaction (before sending reply) + if message_id and _dt_app_key: + try: + from app.services.dingtalk_reaction import recall_thinking_reaction + await recall_thinking_reaction( + _dt_app_key, _dt_app_secret, + message_id, conversation_id, + ) + except Exception as _recall_err: + logger.warning(f"[DingTalk] Failed to recall thinking reaction: {_recall_err}") + + has_media = bool(image_base64_list or saved_file_paths) + logger.info( + f"[DingTalk] LLM reply ({'media' if has_media else 'text'} input): " + f"{reply_text[:100]}" ) - logger.info(f"[DingTalk] LLM reply: {reply_text[:100]}") # Reply via session webhook (markdown) try: diff --git a/backend/app/api/gateway.py b/backend/app/api/gateway.py index 8182562ba..0f6f6010e 100644 --- a/backend/app/api/gateway.py +++ b/backend/app/api/gateway.py @@ -413,7 +413,7 @@ async def _send_to_agent_background( "--- Agent-to-Agent Communication Alert ---\n" f"You are receiving a direct message from another digital employee ({source_agent_name}). " "CRITICAL INSTRUCTION: Your direct text reply will automatically be delivered back to them. " - "DO NOT use the `send_agent_message` tool to reply to this conversation. Just reply naturally in text.\n" + "DO NOT use the `send_message_to_agent` tool to reply to this conversation. Just reply naturally in text.\n" "If they are asking you to create or analyze a file, deliver the file using `send_file_to_agent` after writing it." ) diff --git a/backend/app/api/relationships.py b/backend/app/api/relationships.py index 5f56297ee..19f0b77f9 100644 --- a/backend/app/api/relationships.py +++ b/backend/app/api/relationships.py @@ -303,7 +303,7 @@ async def _regenerate_relationships_file(db: AsyncSession, agent_id: uuid.UUID): label = AGENT_RELATION_LABELS.get(r.relation, r.relation) lines.append(f"### {a.name} — {a.role_description or '数字员工'}") lines.append(f"- 关系:{label}") - lines.append(f"- 可以用 send_agent_message 工具给 {a.name} 发消息协作") + lines.append(f"- 可以用 send_message_to_agent 工具给 {a.name} 发消息协作") if r.description: lines.append(f"- {r.description}") lines.append("") diff --git a/backend/app/api/tenants.py b/backend/app/api/tenants.py index 1e8c26b14..c55f479c0 100644 --- a/backend/app/api/tenants.py +++ b/backend/app/api/tenants.py @@ -37,6 +37,7 @@ class TenantOut(BaseModel): is_active: bool sso_enabled: bool = False sso_domain: str | None = None + a2a_async_enabled: bool = False created_at: datetime | None = None model_config = {"from_attributes": True} @@ -49,6 +50,7 @@ class TenantUpdate(BaseModel): is_active: bool | None = None sso_enabled: bool | None = None sso_domain: str | None = None + a2a_async_enabled: bool | None = None # ─── Helpers ──────────────────────────────────────────── diff --git a/backend/app/api/websocket.py b/backend/app/api/websocket.py index dd466ed2e..ad57d3a36 100644 --- a/backend/app/api/websocket.py +++ b/backend/app/api/websocket.py @@ -121,6 +121,7 @@ async def call_llm( on_tool_call=None, on_thinking=None, supports_vision=False, + max_tool_rounds_override: int | None = None, ) -> str: """Call LLM via unified client with function-calling tool loop. @@ -142,6 +143,8 @@ async def call_llm( _agent = _ar.scalar_one_or_none() if _agent: _max_tool_rounds = _agent.max_tool_rounds or 50 + if max_tool_rounds_override and max_tool_rounds_override < _max_tool_rounds: + _max_tool_rounds = max_tool_rounds_override if _agent.max_tokens_per_day and _agent.tokens_used_today >= _agent.max_tokens_per_day: return f"⚠️ Daily token usage has reached the limit ({_agent.tokens_used_today:,}/{_agent.max_tokens_per_day:,}). Please try again tomorrow or ask admin to increase the limit." if _agent.max_tokens_per_month and _agent.tokens_used_month >= _agent.max_tokens_per_month: @@ -149,6 +152,9 @@ async def call_llm( except Exception: pass + if max_tool_rounds_override and max_tool_rounds_override < _max_tool_rounds: + _max_tool_rounds = max_tool_rounds_override + # Build rich prompt with soul, memory, skills, relationships from app.services.agent_context import build_agent_context # Look up current user's display name so the agent knows who it's talking to @@ -678,11 +684,28 @@ async def websocket_chat( # Add user message to conversation (full LLM context) conversation.append({"role": "user", "content": content}) - # Save user message — display_content for history display, content for LLM - # Prefix with [file:name] if there's a file attachment so history can show it - saved_content = display_content if display_content else content - if file_name: - saved_content = f"[file:{file_name}]\n{saved_content}" + # Save user message to DB. + # + # Strategy: + # - If the LLM content contains [image_data:...] markers (i.e. the user + # attached an image and the model supports vision), persist the FULL + # content including the base64 marker. This makes history self-contained + # so subsequent turns can forward the image to the LLM without any + # disk-based rehydration step. + # - For all other messages (text, non-image files) use display_content for + # cleaner history (avoids e.g. the raw file-text blob appearing in chat). + # + # The call_llm() path already strips [image_data:] for non-vision models + # (websocket.py ~line 210), so no extra handling is needed at read time. + HAS_IMAGE_MARKER = "[image_data:" in content + if HAS_IMAGE_MARKER: + # Preserve the full LLM content (includes base64) for multi-turn context. + # Prefix with [file:name] for the UI history parser if a file name exists. + saved_content = f"[file:{file_name}]\n{content}" if file_name else content + else: + saved_content = display_content if display_content else content + if file_name: + saved_content = f"[file:{file_name}]\n{saved_content}" async with async_session() as db: user_msg = ChatMessage( agent_id=agent_id, @@ -703,7 +726,7 @@ async def websocket_chat( if _sess: _sess.last_message_at = _now if not history_messages and _sess.title.startswith("Session "): - # Use display_content for title (avoids raw base64/markers) + # Always use display_content for title (never expose raw base64) title_src = display_content if display_content else content # Clean up common prefixes from image/file messages clean_title = title_src.replace("[图片] ", "📷 ").replace("[image_data:", "").strip() @@ -713,6 +736,7 @@ async def websocket_chat( await db.commit() logger.info("[WS] User message saved") + # ── OpenClaw routing: insert into gateway_messages instead of LLM ── if agent_type == "openclaw": from app.models.gateway_message import GatewayMessage as GwMsg diff --git a/backend/app/config.py b/backend/app/config.py index 7d1b2a54e..ee9ab99bd 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -103,6 +103,9 @@ class Settings(BaseSettings): # Jina AI (Reader + Search APIs) JINA_API_KEY: str = "" + # Exa AI (Search API) + EXA_API_KEY: str = "" + # Sandbox configuration SANDBOX_TYPE: SandboxType = SandboxType.SUBPROCESS diff --git a/backend/app/models/tenant.py b/backend/app/models/tenant.py index 113238c62..ada852476 100644 --- a/backend/app/models/tenant.py +++ b/backend/app/models/tenant.py @@ -49,3 +49,7 @@ class Tenant(Base): min_poll_interval_floor: Mapped[int] = mapped_column(Integer, default=5) max_webhook_rate_ceiling: Mapped[int] = mapped_column(Integer, default=5) + # A2A async communication (notify / task_delegate) + # When False, all agent-to-agent messages use synchronous consult mode + a2a_async_enabled: Mapped[bool] = mapped_column(Boolean, default=False) + diff --git a/backend/app/services/agent_tools.py b/backend/app/services/agent_tools.py index f1880d172..adbdb9408 100644 --- a/backend/app/services/agent_tools.py +++ b/backend/app/services/agent_tools.py @@ -532,7 +532,7 @@ async def _get_tool_config(agent_id: Optional[uuid.UUID], tool_name: str) -> Opt "type": "function", "function": { "name": "send_message_to_agent", - "description": "Send a message to a digital employee colleague and receive a reply. The recipient is another AI agent, not a human. This triggers the recipient's LLM reasoning and returns their response. Suitable for asking questions, delegating tasks, or collaboration. Your relationships.md lists available digital employees under 'Digital Employee Colleagues'.", + "description": "Send a message to a digital employee colleague. The recipient is another AI agent, not a human. Your relationships.md lists available digital employees under 'Digital Employee Colleagues'.\n\nDECISION GUIDE for msg_type:\nAsk yourself: does the target agent need to DO WORK (analyze, research, summarize, write, compare, plan, etc.) and RETURN RESULTS to you or the user?\n\n- If YES, the target needs to do work → use task_delegate. Examples: 'summarize X', 'analyze Y', 'check Z', 'prepare a report', 'review and give feedback', 'find out X', 'confirm with X and report back'. The target works asynchronously and you will be woken when they finish.\n\n- If the target just needs to KNOW something → use notify. Examples: 'meeting cancelled', 'I updated the doc', 'heads up about X', 'FYI'. No reply expected.\n\n- If you need a quick factual answer right now → use consult. Examples: 'what is X?', 'do you know Y?'. Synchronous, blocks until reply.\n\nWhen in doubt between notify and task_delegate, prefer task_delegate — it is safer because it guarantees the user gets a result.", "parameters": { "type": "object", "properties": { @@ -547,10 +547,10 @@ async def _get_tool_config(agent_id: Optional[uuid.UUID], tool_name: str) -> Opt "msg_type": { "type": "string", "enum": ["notify", "consult", "task_delegate"], - "description": "Message type: notify (notification), consult (ask a question), task_delegate (delegate a task). Defaults to notify.", + "description": "Decision guide: (1) Will the target need to DO WORK and return results? → task_delegate. (2) Is this just a one-way FYI? → notify. (3) Quick factual question needing immediate answer? → consult. When unsure, prefer task_delegate.", }, }, - "required": ["agent_name", "message"], + "required": ["agent_name", "message", "msg_type"], }, }, }, @@ -1801,6 +1801,37 @@ async def _agent_has_any_channel(agent_id: uuid.UUID) -> bool: # ─── Dynamic Tool Loading from DB ────────────────────────────── + +def _strip_a2a_msg_type(tools: list[dict]) -> list[dict]: + """Remove the msg_type parameter from send_message_to_agent when async A2A is disabled. + + This prevents the LLM from seeing and selecting notify/task_delegate modes + that would be silently overridden to consult anyway, which confuses users + who see the tool call arguments in the chat UI. + """ + import copy + result = [] + for t in tools: + fn = t.get("function", {}) + if fn.get("name") == "send_message_to_agent": + t = copy.deepcopy(t) + fn = t["function"] + # Simplify description to only mention consult + fn["description"] = ( + "Send a message to a digital employee colleague and receive their reply synchronously." + ) + params = fn.get("parameters", {}) + props = params.get("properties", {}) + # Remove msg_type parameter entirely + props.pop("msg_type", None) + # Remove msg_type from required list + req = params.get("required", []) + if "msg_type" in req: + params["required"] = [r for r in req if r != "msg_type"] + result.append(t) + return result + + async def get_agent_tools_for_llm(agent_id: uuid.UUID) -> list[dict]: """Load enabled tools for an agent from DB (OpenAI function-calling format). @@ -1811,11 +1842,31 @@ async def get_agent_tools_for_llm(agent_id: uuid.UUID) -> list[dict]: Also patches agentbay_file_transfer description with OS-specific paths based on the agent's computer tool configuration (os_type: 'windows' | 'linux'). + + When the tenant's a2a_async_enabled flag is False, the msg_type parameter is + removed from the send_message_to_agent tool so the LLM only sees the + synchronous consult behaviour. """ has_feishu = await _agent_has_feishu(agent_id) has_any_channel = await _agent_has_any_channel(agent_id) _always_tools = _always_core_tools + (_feishu_tools if has_feishu else []) + (_channel_tools if has_any_channel else []) + # Check tenant-level a2a_async_enabled flag + _a2a_async = False + try: + from app.models.tenant import Tenant + from app.models.agent import Agent as AgentModel + async with async_session() as _flag_db: + _ag_r = await _flag_db.execute(select(AgentModel.tenant_id).where(AgentModel.id == agent_id)) + _tid = _ag_r.scalar_one_or_none() + if _tid: + _t_r = await _flag_db.execute(select(Tenant).where(Tenant.id == _tid)) + _tenant = _t_r.scalar_one_or_none() + if _tenant: + _a2a_async = getattr(_tenant, "a2a_async_enabled", False) + except Exception: + pass + # Read os_type once; used to patch agentbay_file_transfer paths below computer_os_type = await _get_computer_os_type(agent_id) @@ -1878,12 +1929,19 @@ async def get_agent_tools_for_llm(agent_id: uuid.UUID) -> list[dict]: if t["function"]["name"] not in db_tool_names: result.append(t) # Inject OS-aware paths into computer-related tool descriptions - return _patch_computer_tool_descriptions(result, computer_os_type) + result = _patch_computer_tool_descriptions(result, computer_os_type) + # Strip msg_type from send_message_to_agent when async A2A is disabled + if not _a2a_async: + result = _strip_a2a_msg_type(result) + return result except Exception as e: logger.error(f"[Tools] DB load failed, using fallback: {e}") # Fallback to hardcoded tools (still apply OS-aware path patching) - return _patch_computer_tool_descriptions(AGENT_TOOLS, computer_os_type) + fallback = _patch_computer_tool_descriptions(AGENT_TOOLS, computer_os_type) + if not _a2a_async: + fallback = _strip_a2a_msg_type(fallback) + return fallback # ─── Workspace initialization ────────────────────────────────── @@ -2029,6 +2087,16 @@ async def _execute_tool_direct( return await _web_search(arguments, agent_id) elif tool_name == "jina_search": return await _jina_search(arguments) + elif tool_name == "exa_search": + return await _exa_search(arguments, agent_id) + elif tool_name == "duckduckgo_search": + return await _duckduckgo_search_tool(arguments) + elif tool_name == "tavily_search": + return await _tavily_search_tool(arguments, agent_id) + elif tool_name == "google_search": + return await _google_search_tool(arguments, agent_id) + elif tool_name == "bing_search": + return await _bing_search_tool(arguments, agent_id) elif tool_name == "send_feishu_message": return await _send_feishu_message(agent_id, arguments) elif tool_name == "send_message_to_agent": @@ -2187,8 +2255,16 @@ async def execute_tool( result = await _web_search(arguments, agent_id) elif tool_name == "jina_search": result = await _jina_search(arguments) + elif tool_name == "exa_search": + result = await _exa_search(arguments, agent_id) + elif tool_name == "duckduckgo_search": + result = await _duckduckgo_search_tool(arguments) + elif tool_name == "tavily_search": + result = await _tavily_search_tool(arguments, agent_id) + elif tool_name == "google_search": + result = await _google_search_tool(arguments, agent_id) elif tool_name == "bing_search": - result = await _jina_search(arguments) # redirect legacy to jina + result = await _bing_search_tool(arguments, agent_id) elif tool_name == "jina_read": result = await _jina_read(arguments) elif tool_name == "read_webpage": @@ -2364,6 +2440,8 @@ async def _web_search(arguments: dict, agent_id: uuid.UUID | None = None) -> str return await _search_google(query, api_key, max_results, language) elif engine == "bing" and api_key: return await _search_bing(query, api_key, max_results, language) + elif engine == "exa" and api_key: + return await _search_exa(query, api_key, max_results) else: return await _search_duckduckgo(query, max_results) except Exception as e: @@ -2586,6 +2664,190 @@ async def _search_bing(query: str, api_key: str, max_results: int, language: str return f'🔍 Bing search for "{query}" ({len(results)} items):\n\n' + "\n\n---\n\n".join(results) +async def _search_exa(query: str, api_key: str, max_results: int) -> str: + """Search via Exa AI API (exa.ai). Used by the web_search engine selector.""" + import httpx + + async with httpx.AsyncClient() as client: + resp = await client.post( + "https://api.exa.ai/search", + json={ + "query": query, + "type": "auto", + "numResults": max_results, + "contents": {"text": {"maxCharacters": 1000}}, + }, + headers={ + "x-api-key": api_key, + "Content-Type": "application/json", + "x-exa-integration": "clawith", + }, + timeout=15, + ) + data = resp.json() + + if resp.status_code != 200: + return f"❌ Exa search failed: {data.get('error', data.get('message', str(data)[:200]))}" + + results = [] + for r in data.get("results", [])[:max_results]: + title = r.get("title", "Untitled") + url = r.get("url", "") + text = (r.get("text") or "")[:300] + results.append(f"**{title}**\n{url}\n{text}") + + if not results: + return f'🔍 No results found for "{query}"' + return f'🔍 Exa search for "{query}" ({len(results)} items):\n\n' + "\n\n---\n\n".join(results) + + +async def _exa_search(arguments: dict, agent_id: uuid.UUID | None = None) -> str: + """Full-featured Exa AI search with category filtering, domain filtering, and content modes.""" + import httpx + + query = arguments.get("query", "").strip() + if not query: + return "❌ Please provide search keywords" + + config = await _get_tool_config(agent_id, "exa_search") or {} + api_key = config.get("api_key", "") or get_settings().EXA_API_KEY + if not api_key: + return "❌ Exa API key is required. Set it in tool settings or the EXA_API_KEY environment variable." + + max_results = min(arguments.get("max_results", 5), 10) + search_type = arguments.get("search_type", "auto") + category = arguments.get("category") or None + content_mode = arguments.get("content_mode", "text") + include_domains = arguments.get("include_domains") + exclude_domains = arguments.get("exclude_domains") + + body: dict = { + "query": query, + "type": search_type, + "numResults": max_results, + "contents": {}, + } + + if category: + body["category"] = category + if include_domains: + body["includeDomains"] = [d.strip() for d in include_domains.split(",") if d.strip()] + if exclude_domains: + body["excludeDomains"] = [d.strip() for d in exclude_domains.split(",") if d.strip()] + + if content_mode == "highlights": + body["contents"]["highlights"] = {"numSentences": 3} + elif content_mode == "summary": + body["contents"]["summary"] = {} + else: + body["contents"]["text"] = {"maxCharacters": 1000} + + try: + async with httpx.AsyncClient() as client: + resp = await client.post( + "https://api.exa.ai/search", + json=body, + headers={ + "x-api-key": api_key, + "Content-Type": "application/json", + "x-exa-integration": "clawith", + }, + timeout=15, + ) + data = resp.json() + + if resp.status_code != 200: + return f"❌ Exa search failed: {data.get('error', data.get('message', str(data)[:200]))}" + + items = data.get("results", [])[:max_results] + if not items: + return f'🔍 No results found for "{query}"' + + parts = [] + for i, r in enumerate(items, 1): + title = r.get("title", "Untitled") + url = r.get("url", "") + content = "" + if content_mode == "highlights" and r.get("highlights"): + content = " ... ".join(r["highlights"]) + elif content_mode == "summary" and r.get("summary"): + content = r["summary"] + elif r.get("text"): + content = r["text"][:500] + parts.append(f"**{i}. {title}**\n{url}\n{content}") + + return f'🔍 Exa search for "{query}" ({len(items)} items):\n\n' + "\n\n---\n\n".join(parts) + + except Exception as e: + return f"❌ Exa search error: {str(e)[:300]}" + + + +# ── Standalone search engine tool wrappers ─────────────────────────────────── +# Each function reads its own tool config (agent > company > defaults) and +# delegates to the existing private search implementations above. + + +async def _duckduckgo_search_tool(arguments: dict) -> str: + """Standalone DuckDuckGo search tool (no API key required).""" + query = arguments.get("query", "").strip() + if not query: + return "Please provide search keywords" + max_results = min(arguments.get("max_results", 5), 10) + return await _search_duckduckgo(query, max_results) + + +async def _tavily_search_tool(arguments: dict, agent_id: uuid.UUID | None = None) -> str: + """Standalone Tavily search tool (API key read from per-tool config).""" + query = arguments.get("query", "").strip() + if not query: + return "Please provide search keywords" + config = await _get_tool_config(agent_id, "tavily_search") or {} + api_key = config.get("api_key", "").strip() + if not api_key: + return "Tavily API key is required. Set it in the tool settings." + max_results = min(arguments.get("max_results", 5), 10) + try: + return await _search_tavily(query, api_key, max_results) + except Exception as e: + return f"Tavily search error: {str(e)[:200]}" + + +async def _google_search_tool(arguments: dict, agent_id: uuid.UUID | None = None) -> str: + """Standalone Google Custom Search tool (API key read from per-tool config).""" + query = arguments.get("query", "").strip() + if not query: + return "Please provide search keywords" + config = await _get_tool_config(agent_id, "google_search") or {} + api_key = config.get("api_key", "").strip() + if not api_key: + return "Google Search API key is required (format: API_KEY:SEARCH_ENGINE_ID). Set it in the tool settings." + # Allow per-call language override; fall back to tool config, then default + language = arguments.get("language") or config.get("language", "en") + max_results = min(arguments.get("max_results", 5), 10) + try: + return await _search_google(query, api_key, max_results, language) + except Exception as e: + return f"Google search error: {str(e)[:200]}" + + +async def _bing_search_tool(arguments: dict, agent_id: uuid.UUID | None = None) -> str: + """Standalone Bing Web Search tool (API key read from per-tool config).""" + query = arguments.get("query", "").strip() + if not query: + return "Please provide search keywords" + config = await _get_tool_config(agent_id, "bing_search") or {} + api_key = config.get("api_key", "").strip() + if not api_key: + return "Bing Search API key is required. Set it in the tool settings." + language = arguments.get("language") or config.get("language", "en-US") + max_results = min(arguments.get("max_results", 5), 10) + try: + return await _search_bing(query, api_key, max_results, language) + except Exception as e: + return f"Bing search error: {str(e)[:200]}" + + async def _send_channel_file(agent_id: uuid.UUID, ws: Path, arguments: dict) -> str: """Send a file to a person or back to the current channel. @@ -4376,13 +4638,196 @@ async def _send_file_to_agent(from_agent_id: uuid.UUID, ws: Path, args: dict) -> return f"❌ Agent file send error: {str(e)[:200]}" +async def _resolve_a2a_target( + db, from_agent_id: uuid.UUID, agent_name: str +) -> tuple[AgentModel | None, str | None]: + """Resolve the target agent for A2A communication. + + Returns (target_agent, error_message). If target is None, error_message + explains why. Caller is responsible for relationship / expiry checks. + """ + src_result = await db.execute(select(AgentModel).where(AgentModel.id == from_agent_id)) + source_agent = src_result.scalar_one_or_none() + source_tenant_id = source_agent.tenant_id if source_agent else None + + base_filter = [AgentModel.id != from_agent_id] + if source_tenant_id: + base_filter.append(AgentModel.tenant_id == source_tenant_id) + + exact_result = await db.execute( + select(AgentModel).where(AgentModel.name == agent_name, *base_filter) + ) + target = exact_result.scalars().first() + if not target: + safe_name = agent_name.replace("%", "").replace("_", r"\_") + fuzzy_result = await db.execute( + select(AgentModel).where(AgentModel.name.ilike(f"%{safe_name}%"), *base_filter) + ) + target = fuzzy_result.scalars().first() + if not target: + rel_r = await db.execute( + select(AgentModel.name).join( + AgentAgentRelationship, + (AgentAgentRelationship.target_agent_id == AgentModel.id) & (AgentAgentRelationship.agent_id == from_agent_id) + ) + ) + rel_names = [n for (n,) in rel_r.all()] + return None, f"❌ No agent found matching '{agent_name}'. Your connected colleagues: {', '.join(rel_names) if rel_names else 'none — ask your administrator to set up relationships'}" + + return target, None + + +async def _ensure_a2a_session( + db, from_agent_id: uuid.UUID, target_id: uuid.UUID, source_name: str, owner_id: uuid.UUID +) -> tuple[ChatSession, str]: + """Find or create the ChatSession for a pair of agents. + + Returns (chat_session, session_id_str). + """ + from app.models.participant import Participant + + session_agent_id = min(from_agent_id, target_id, key=str) + session_peer_id = max(from_agent_id, target_id, key=str) + sess_r = await db.execute( + select(ChatSession).where( + ChatSession.agent_id == session_agent_id, + ChatSession.peer_agent_id == session_peer_id, + ChatSession.source_channel == "agent", + ) + ) + chat_session = sess_r.scalar_one_or_none() + if not chat_session: + src_part_r = await db.execute(select(Participant).where(Participant.type == "agent", Participant.ref_id == from_agent_id)) + src_participant = src_part_r.scalar_one_or_none() + src_part_id = src_participant.id if src_participant else None + chat_session = ChatSession( + agent_id=session_agent_id, + user_id=owner_id, + title=f"{source_name} ↔ {(await db.execute(select(AgentModel.name).where(AgentModel.id == target_id))).scalar() or 'Unknown'}", + source_channel="agent", + participant_id=src_part_id, + peer_agent_id=session_peer_id, + ) + db.add(chat_session) + await db.flush() + return chat_session, str(chat_session.id) + + +async def _create_on_message_trigger( + agent_id: uuid.UUID, + trigger_name: str, + from_agent_name: str, + reason: str, + focus_ref: str | None = None, + notification_summary: str | None = None, +) -> None: + """Programmatically create an on_message trigger for an agent.""" + from app.models.trigger import AgentTrigger + + config: dict = {"from_agent_name": from_agent_name} + if notification_summary: + config["_notification_summary"] = notification_summary + + try: + from app.models.audit import ChatMessage as _CM + from app.models.chat_session import ChatSession as _CS + from sqlalchemy import cast as sa_cast, String as SaString + async with async_session() as _snap_db: + _snap_q = select(_CM.created_at).join( + _CS, _CM.conversation_id == sa_cast(_CS.id, SaString) + ).where( + _CS.agent_id == agent_id, + _CM.created_at.isnot(None), + ).order_by(_CM.created_at.desc()).limit(1) + _snap_r = await _snap_db.execute(_snap_q) + _latest_ts = _snap_r.scalar_one_or_none() + if _latest_ts: + config["_since_ts"] = _latest_ts.isoformat() + except Exception: + pass + + async with async_session() as db: + result = await db.execute( + select(AgentTrigger).where( + AgentTrigger.agent_id == agent_id, + AgentTrigger.name == trigger_name, + ) + ) + existing = result.scalar_one_or_none() + if existing: + if existing.is_enabled: + existing.config = {**(existing.config or {}), **config} + existing.reason = reason + if focus_ref: + existing.focus_ref = focus_ref + await db.commit() + return + else: + existing.type = "on_message" + existing.config = config + existing.reason = reason + existing.focus_ref = focus_ref or None + existing.is_enabled = True + await db.commit() + return + + trigger = AgentTrigger( + agent_id=agent_id, + name=trigger_name, + type="on_message", + config=config, + reason=reason, + focus_ref=focus_ref or None, + max_fires=1, + expires_at=datetime.now(timezone.utc) + timedelta(hours=24), + ) + db.add(trigger) + await db.commit() + + +async def _append_focus_item(agent_id: uuid.UUID, identifier: str, description: str) -> None: + """Append a pending focus item to the agent's focus.md.""" + focus_path = WORKSPACE_ROOT / str(agent_id) / "focus.md" + line = f"- [ ] {identifier}: {description}\n" + try: + if focus_path.exists(): + content = focus_path.read_text(encoding="utf-8") + if identifier in content: + return + if not content.endswith("\n"): + content += "\n" + content += line + else: + content = f"# Focus\n\n{line}" + focus_path.parent.mkdir(parents=True, exist_ok=True) + focus_path.write_text(content, encoding="utf-8") + except Exception as e: + logger.warning(f"[A2A] Failed to update focus.md for agent {agent_id}: {e}") + + +async def _wake_agent_async(agent_id: uuid.UUID, reason_context: str, *, from_agent_id: uuid.UUID | None = None, skip_dedup: bool = False) -> None: + """Wake an agent asynchronously via the trigger invocation path. + + Delegates to the public wake_agent_with_context API in trigger_daemon. + """ + from app.services.trigger_daemon import wake_agent_with_context + await wake_agent_with_context(agent_id, reason_context, from_agent_id=from_agent_id, skip_dedup=skip_dedup) + + async def _send_message_to_agent(from_agent_id: uuid.UUID, args: dict) -> str: - """Send a message to another digital employee. Uses a single request-response pattern: - the source agent sends a message, the target agent replies once, and the result is returned. - If the source agent needs to continue the conversation, it can call this tool again. + """Send a message to another digital employee. + + Behaviour depends on ``msg_type``: + - notify: fire-and-forget — message is saved, target is woken asynchronously. + Returns immediately. + - task_delegate: async with callback — message is saved, source agent sets up + a focus item + on_message trigger so it is notified when the + target completes the task. Returns immediately. + - consult: synchronous request-response (original behaviour). """ agent_name = args.get("agent_name", "").strip() message_text = args.get("message", "").strip() + msg_type = args.get("msg_type", "notify").strip().lower() if not agent_name or not message_text: return "❌ Please provide target agent name and message content" @@ -4513,6 +4958,119 @@ async def _send_message_to_agent(from_agent_id: uuid.UUID, args: dict) -> str: status_hint = "online" if online else "offline (message will be delivered on next heartbeat)" return f"✅ Message sent to {target.name} (OpenClaw agent, currently {status_hint}). The message has been queued and will be delivered when the agent polls for updates." + # ── Native target: branch by msg_type ── + + # Save source message (common to all paths) + db.add(ChatMessage( + agent_id=session_agent_id, + user_id=owner_id, + role="user", + content=message_text, + conversation_id=session_id, + participant_id=src_participant.id if src_participant else None, + )) + chat_session.last_message_at = datetime.now(timezone.utc) + await db.commit() + + # ── Feature flag: async A2A (tenant-level) ── + _a2a_async = False + if source_agent.tenant_id: + try: + from app.models.tenant import Tenant + _t_r = await db.execute(select(Tenant).where(Tenant.id == source_agent.tenant_id)) + _tenant = _t_r.scalar_one_or_none() + if _tenant: + _a2a_async = getattr(_tenant, "a2a_async_enabled", False) + except Exception: + pass + if not _a2a_async: + if msg_type in ("notify", "task_delegate"): + msg_type = "consult" + + # ── notify: fire-and-forget ── + if msg_type == "notify": + try: + from app.services.activity_logger import log_activity + await log_activity( + from_agent_id, "agent_msg_sent", + f"Sent notification to {target.name}", + detail={"partner": target.name, "message": message_text[:200], "msg_type": "notify"}, + ) + except Exception: + pass + + try: + await _wake_agent_async( + target.id, + f"[From {source_name}] {message_text}", + from_agent_id=from_agent_id, + skip_dedup=True, + ) + except Exception as e: + logger.warning(f"[A2A] Failed to wake {target.name} for notify: {e}") + + return f"✅ Notification sent to {target.name}. They will process it asynchronously." + + # ── task_delegate: async with callback ── + if msg_type == "task_delegate": + focus_id = f"wait_{target.name.lower().replace(' ', '_')}_task" + focus_desc = f"Waiting for {target.name} to complete delegated task: {message_text[:100]}" + + try: + await _append_focus_item(from_agent_id, focus_id, focus_desc) + except Exception as e: + logger.warning(f"[A2A] Failed to write focus for delegate: {e}") + + trigger_name = f"a2a_wait_{target.name.lower().replace(' ', '_')}" + trigger_reason = ( + f"{target.name} has replied with the result of a delegated task. " + f"Original task: {message_text[:200]}. " + f"Steps: 1) Process {target.name}'s reply. " + f"2) Mark focus item '{focus_id}' as completed. " + f"3) Cancel this trigger. " + f"USER-FACING OUTPUT RULES: Your reply goes directly to the user's chat. " + f"Write in natural, conversational language as if talking to a colleague. " + f"NEVER use technical terms like: trigger name, focus item, a2a_wait, " + f"task_delegate, focus_ref, or any internal identifier. " + f"NEVER mention your internal operations (canceling triggers, updating focus, " + f"marking items complete, trigger status, etc.). " + f"Just summarize the task result in plain language." + ) + try: + await _create_on_message_trigger( + agent_id=from_agent_id, + trigger_name=trigger_name, + from_agent_name=target.name, + reason=trigger_reason, + focus_ref=focus_id, + notification_summary=f"等待{target.name}完成任务并回复", + ) + except Exception as e: + logger.warning(f"[A2A] Failed to create trigger for delegate: {e}") + + try: + from app.services.activity_logger import log_activity + await log_activity( + from_agent_id, "agent_msg_sent", + f"Delegated task to {target.name}", + detail={"partner": target.name, "message": message_text[:200], "msg_type": "task_delegate"}, + ) + except Exception: + pass + + try: + await _wake_agent_async( + target.id, + f"[From {source_name}] {message_text}", + from_agent_id=from_agent_id, + skip_dedup=True, + ) + except Exception as e: + logger.warning(f"[A2A] Failed to wake {target.name} for delegate: {e}") + + return f"✅ Task delegated to {target.name}. You will be notified when they complete it." + + # ── consult (default): synchronous request-response ── # Prepare target LLM from app.services.agent_context import build_agent_context from app.models.llm import LLMModel @@ -4564,24 +5122,8 @@ async def _send_message_to_agent(from_agent_id: uuid.UUID, args: dict) -> str: role = "assistant" conversation_messages.append({"role": role, "content": m.content}) - # Add the new message from source conversation_messages.append({"role": "user", "content": f"[From {source_name}] {message_text}"}) - # Save source message - owner_id = source_agent.creator_id if source_agent else from_agent_id - db.add(ChatMessage( - agent_id=session_agent_id, - user_id=owner_id, - role="user", - content=message_text, - conversation_id=session_id, - participant_id=src_participant.id if src_participant else None, - )) - chat_session.last_message_at = datetime.now(timezone.utc) - await db.commit() - - # Call target LLM with tool support (multi-round) - import asyncio import random import httpx from app.services.llm_utils import ( diff --git a/backend/app/services/dingtalk_reaction.py b/backend/app/services/dingtalk_reaction.py new file mode 100644 index 000000000..4899de78b --- /dev/null +++ b/backend/app/services/dingtalk_reaction.py @@ -0,0 +1,110 @@ +"""DingTalk emotion reaction service — "thinking" indicator on user messages.""" + +import asyncio +from loguru import logger +from app.services.dingtalk_token import dingtalk_token_manager + + +async def add_thinking_reaction( + app_key: str, + app_secret: str, + message_id: str, + conversation_id: str, +) -> bool: + """Add "🤔思考中" reaction to a user message. Fire-and-forget, never raises.""" + import httpx + + if not message_id or not conversation_id or not app_key: + return False + + try: + token = await dingtalk_token_manager.get_token(app_key, app_secret) + if not token: + logger.warning("[DingTalk Reaction] Failed to get access token") + return False + + async with httpx.AsyncClient(timeout=5) as client: + resp = await client.post( + "https://api.dingtalk.com/v1.0/robot/emotion/reply", + headers={ + "x-acs-dingtalk-access-token": token, + "Content-Type": "application/json", + }, + json={ + "robotCode": app_key, + "openMsgId": message_id, + "openConversationId": conversation_id, + "emotionType": 2, + "emotionName": "🤔思考中", + "textEmotion": { + "emotionId": "2659900", + "emotionName": "🤔思考中", + "text": "🤔思考中", + "backgroundId": "im_bg_1", + }, + }, + ) + if resp.status_code == 200: + logger.info(f"[DingTalk Reaction] Thinking reaction added for msg {message_id[:16]}") + return True + else: + logger.warning(f"[DingTalk Reaction] Add failed: {resp.status_code} {resp.text[:200]}") + return False + except Exception as e: + logger.warning(f"[DingTalk Reaction] Add thinking reaction error: {e}") + return False + + +async def recall_thinking_reaction( + app_key: str, + app_secret: str, + message_id: str, + conversation_id: str, +) -> None: + """Recall "🤔思考中" reaction with retry (0ms, 1500ms, 5000ms). Fire-and-forget.""" + import httpx + + if not message_id or not conversation_id or not app_key: + return + + delays = [0, 1.5, 5.0] + + for delay in delays: + if delay > 0: + await asyncio.sleep(delay) + + try: + token = await dingtalk_token_manager.get_token(app_key, app_secret) + if not token: + continue + + async with httpx.AsyncClient(timeout=5) as client: + resp = await client.post( + "https://api.dingtalk.com/v1.0/robot/emotion/recall", + headers={ + "x-acs-dingtalk-access-token": token, + "Content-Type": "application/json", + }, + json={ + "robotCode": app_key, + "openMsgId": message_id, + "openConversationId": conversation_id, + "emotionType": 2, + "emotionName": "🤔思考中", + "textEmotion": { + "emotionId": "2659900", + "emotionName": "🤔思考中", + "text": "🤔思考中", + "backgroundId": "im_bg_1", + }, + }, + ) + if resp.status_code == 200: + logger.info(f"[DingTalk Reaction] Thinking reaction recalled for msg {message_id[:16]}") + return + else: + logger.warning(f"[DingTalk Reaction] Recall attempt failed: {resp.status_code}") + except Exception as e: + logger.warning(f"[DingTalk Reaction] Recall error: {e}") + + logger.warning(f"[DingTalk Reaction] All recall attempts failed for msg {message_id[:16]}") diff --git a/backend/app/services/dingtalk_service.py b/backend/app/services/dingtalk_service.py index 010fc56a4..d2b70f28b 100644 --- a/backend/app/services/dingtalk_service.py +++ b/backend/app/services/dingtalk_service.py @@ -155,3 +155,20 @@ async def send_dingtalk_message( if not agent_id: agent_id = app_id return await send_dingtalk_corp_conversation(app_id, app_secret, user_id, msg_body, agent_id) + + +async def download_dingtalk_media( + app_id: str, app_secret: str, download_code: str +) -> bytes | None: + """Download a media file from DingTalk using a downloadCode. + + Convenience wrapper that delegates to the stream module's download helper. + Returns raw file bytes on success, or None on failure. + + Args: + app_id: DingTalk app key (robotCode). + app_secret: DingTalk app secret. + download_code: The downloadCode from the incoming message payload. + """ + from app.services.dingtalk_stream import download_dingtalk_media as _download + return await _download(app_id, app_secret, download_code) diff --git a/backend/app/services/dingtalk_stream.py b/backend/app/services/dingtalk_stream.py index 28a8ba8e2..73cf80b20 100644 --- a/backend/app/services/dingtalk_stream.py +++ b/backend/app/services/dingtalk_stream.py @@ -5,15 +5,401 @@ """ import asyncio +import base64 +import json import threading import uuid -from typing import Dict +from pathlib import Path +from typing import Dict, List, Optional, Tuple +import httpx from loguru import logger from sqlalchemy import select +from app.config import get_settings from app.database import async_session from app.models.channel_config import ChannelConfig +from app.services.dingtalk_token import dingtalk_token_manager + + +# ─── DingTalk Media Helpers ───────────────────────────── + + +async def _get_media_download_url( + access_token: str, download_code: str, robot_code: str +) -> Optional[str]: + """Get media file download URL from DingTalk API.""" + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post( + "https://api.dingtalk.com/v1.0/robot/messageFiles/download", + headers={"x-acs-dingtalk-access-token": access_token}, + json={"downloadCode": download_code, "robotCode": robot_code}, + ) + data = resp.json() + url = data.get("downloadUrl") + if url: + return url + logger.error(f"[DingTalk] Failed to get download URL: {data}") + return None + except Exception as e: + logger.error(f"[DingTalk] Error getting download URL: {e}") + return None + + +async def _download_file(url: str) -> Optional[bytes]: + """Download a file from a URL and return its bytes.""" + try: + async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client: + resp = await client.get(url) + resp.raise_for_status() + return resp.content + except Exception as e: + logger.error(f"[DingTalk] Error downloading file: {e}") + return None + + +async def download_dingtalk_media( + app_key: str, app_secret: str, download_code: str +) -> Optional[bytes]: + """Download a media file from DingTalk using downloadCode. + + Steps: get access_token -> get download URL -> download file bytes. + """ + access_token = await dingtalk_token_manager.get_token(app_key, app_secret) + if not access_token: + return None + + download_url = await _get_media_download_url(access_token, download_code, app_key) + if not download_url: + return None + + return await _download_file(download_url) + + +def _resolve_upload_dir(agent_id: uuid.UUID) -> Path: + """Get the uploads directory for an agent, creating it if needed.""" + settings = get_settings() + upload_dir = Path(settings.AGENT_DATA_DIR) / str(agent_id) / "workspace" / "uploads" + upload_dir.mkdir(parents=True, exist_ok=True) + return upload_dir + + +async def _process_media_message( + msg_data: dict, + app_key: str, + app_secret: str, + agent_id: uuid.UUID, +) -> Tuple[str, Optional[List[str]], Optional[List[str]]]: + """Process a DingTalk message and extract text + media info. + + Returns: + (user_text, image_base64_list, saved_file_paths) + - user_text: text content for the LLM (may include markers) + - image_base64_list: list of base64-encoded image data URIs, or None + - saved_file_paths: list of saved file paths, or None + """ + msgtype = msg_data.get("msgtype", "text") + logger.info(f"[DingTalk] Processing message type: {msgtype}") + + image_base64_list: List[str] = [] + saved_file_paths: List[str] = [] + + if msgtype == "text": + text_content = msg_data.get("text", {}).get("content", "").strip() + return text_content, None, None + + elif msgtype == "picture": + download_code = msg_data.get("content", {}).get("downloadCode", "") + if not download_code: + download_code = msg_data.get("downloadCode", "") + if not download_code: + logger.warning("[DingTalk] Picture message without downloadCode") + return "[User sent an image, but it could not be downloaded]", None, None + + file_bytes = await download_dingtalk_media(app_key, app_secret, download_code) + if not file_bytes: + return "[User sent an image, but download failed]", None, None + + upload_dir = _resolve_upload_dir(agent_id) + filename = f"dingtalk_img_{uuid.uuid4().hex[:8]}.jpg" + save_path = upload_dir / filename + save_path.write_bytes(file_bytes) + logger.info(f"[DingTalk] Saved image to {save_path} ({len(file_bytes)} bytes)") + + b64_data = base64.b64encode(file_bytes).decode("ascii") + image_marker = f"[image_data:data:image/jpeg;base64,{b64_data}]" + return ( + f"[User sent an image]\n{image_marker}", + [f"data:image/jpeg;base64,{b64_data}"], + [str(save_path)], + ) + + elif msgtype == "richText": + rich_text = msg_data.get("content", {}).get("richText", []) + text_parts: List[str] = [] + + for section in rich_text: + for item in section if isinstance(section, list) else [section]: + if "text" in item: + text_parts.append(item["text"]) + elif "downloadCode" in item: + file_bytes = await download_dingtalk_media( + app_key, app_secret, item["downloadCode"] + ) + if file_bytes: + upload_dir = _resolve_upload_dir(agent_id) + filename = f"dingtalk_richimg_{uuid.uuid4().hex[:8]}.jpg" + save_path = upload_dir / filename + save_path.write_bytes(file_bytes) + logger.info(f"[DingTalk] Saved rich text image to {save_path}") + + b64_data = base64.b64encode(file_bytes).decode("ascii") + image_marker = f"[image_data:data:image/jpeg;base64,{b64_data}]" + text_parts.append(image_marker) + image_base64_list.append(f"data:image/jpeg;base64,{b64_data}") + saved_file_paths.append(str(save_path)) + + combined_text = "\n".join(text_parts).strip() + if not combined_text: + combined_text = "[User sent a rich text message]" + + return ( + combined_text, + image_base64_list if image_base64_list else None, + saved_file_paths if saved_file_paths else None, + ) + + elif msgtype == "audio": + content = msg_data.get("content", {}) + recognition = content.get("recognition", "") + if recognition: + logger.info(f"[DingTalk] Audio with recognition: {recognition[:80]}") + return f"[Voice message] {recognition}", None, None + + download_code = content.get("downloadCode", "") + if download_code: + file_bytes = await download_dingtalk_media(app_key, app_secret, download_code) + if file_bytes: + upload_dir = _resolve_upload_dir(agent_id) + duration = content.get("duration", "unknown") + filename = f"dingtalk_audio_{uuid.uuid4().hex[:8]}.amr" + save_path = upload_dir / filename + save_path.write_bytes(file_bytes) + logger.info(f"[DingTalk] Saved audio to {save_path} ({len(file_bytes)} bytes)") + return ( + f"[User sent a voice message, duration {duration}ms, saved to {filename}]", + None, + [str(save_path)], + ) + return "[User sent a voice message, but it could not be processed]", None, None + + elif msgtype == "video": + content = msg_data.get("content", {}) + download_code = content.get("downloadCode", "") + if download_code: + file_bytes = await download_dingtalk_media(app_key, app_secret, download_code) + if file_bytes: + upload_dir = _resolve_upload_dir(agent_id) + duration = content.get("duration", "unknown") + filename = f"dingtalk_video_{uuid.uuid4().hex[:8]}.mp4" + save_path = upload_dir / filename + save_path.write_bytes(file_bytes) + logger.info(f"[DingTalk] Saved video to {save_path} ({len(file_bytes)} bytes)") + return ( + f"[User sent a video, duration {duration}ms, saved to {filename}]", + None, + [str(save_path)], + ) + return "[User sent a video, but it could not be downloaded]", None, None + + elif msgtype == "file": + content = msg_data.get("content", {}) + download_code = content.get("downloadCode", "") + original_filename = content.get("fileName", "unknown_file") + if download_code: + file_bytes = await download_dingtalk_media(app_key, app_secret, download_code) + if file_bytes: + upload_dir = _resolve_upload_dir(agent_id) + safe_name = f"dingtalk_{uuid.uuid4().hex[:8]}_{original_filename}" + save_path = upload_dir / safe_name + save_path.write_bytes(file_bytes) + logger.info( + f"[DingTalk] Saved file '{original_filename}' to {save_path} " + f"({len(file_bytes)} bytes)" + ) + return ( + f"[file:{original_filename}]", + None, + [str(save_path)], + ) + return f"[User sent file {original_filename}, but it could not be downloaded]", None, None + + else: + logger.warning(f"[DingTalk] Unsupported message type: {msgtype}") + return f"[User sent a {msgtype} message, which is not yet supported]", None, None + + +# ─── DingTalk Media Upload & Send ─────────────────────── + +async def _upload_dingtalk_media( + app_key: str, + app_secret: str, + file_path: str, + media_type: str = "file", +) -> Optional[str]: + """Upload a media file to DingTalk and return the mediaId. + + Args: + app_key: DingTalk app key (robotCode). + app_secret: DingTalk app secret. + file_path: Local file path to upload. + media_type: One of 'image', 'voice', 'video', 'file'. + + Returns: + mediaId string on success, None on failure. + """ + access_token = await dingtalk_token_manager.get_token(app_key, app_secret) + if not access_token: + return None + + file_p = Path(file_path) + if not file_p.exists(): + logger.error(f"[DingTalk] Upload failed: file not found: {file_path}") + return None + + try: + file_bytes = file_p.read_bytes() + async with httpx.AsyncClient(timeout=60) as client: + # Use the legacy oapi endpoint which is more reliable and widely supported. + upload_url = ( + f"https://oapi.dingtalk.com/media/upload" + f"?access_token={access_token}&type={media_type}" + ) + resp = await client.post( + upload_url, + files={"media": (file_p.name, file_bytes)}, + ) + data = resp.json() + # Legacy API returns media_id (snake_case), new API returns mediaId + media_id = data.get("media_id") or data.get("mediaId") + if media_id and data.get("errcode", 0) == 0: + logger.info( + f"[DingTalk] Uploaded {media_type} '{file_p.name}' -> mediaId={media_id[:20]}..." + ) + return media_id + logger.error(f"[DingTalk] Upload failed: {data}") + return None + except Exception as e: + logger.error(f"[DingTalk] Upload error: {e}") + return None + + +async def _send_dingtalk_media_message( + app_key: str, + app_secret: str, + target_id: str, + media_id: str, + media_type: str, + conversation_type: str, + filename: Optional[str] = None, +) -> bool: + """Send a media message via DingTalk proactive message API. + + Args: + app_key: DingTalk app key (robotCode). + app_secret: DingTalk app secret. + target_id: For P2P: sender_staff_id; For group: openConversationId. + media_id: The mediaId from upload. + media_type: One of 'image', 'voice', 'video', 'file'. + conversation_type: '1' for P2P, '2' for group. + filename: Original filename (used for file/video types). + + Returns: + True on success, False on failure. + """ + access_token = await dingtalk_token_manager.get_token(app_key, app_secret) + if not access_token: + return False + + headers = {"x-acs-dingtalk-access-token": access_token} + + # Build msgKey and msgParam based on media_type + if media_type == "image": + msg_key = "sampleImageMsg" + msg_param = json.dumps({"photoURL": media_id}) + elif media_type == "voice": + msg_key = "sampleAudio" + msg_param = json.dumps({"mediaId": media_id, "duration": "3000"}) + elif media_type == "video": + safe_name = filename or "video.mp4" + ext = Path(safe_name).suffix.lstrip(".") or "mp4" + msg_key = "sampleFile" + msg_param = json.dumps({ + "mediaId": media_id, + "fileName": safe_name, + "fileType": ext, + }) + else: + # file + safe_name = filename or "file" + ext = Path(safe_name).suffix.lstrip(".") or "bin" + msg_key = "sampleFile" + msg_param = json.dumps({ + "mediaId": media_id, + "fileName": safe_name, + "fileType": ext, + }) + + try: + async with httpx.AsyncClient(timeout=15) as client: + if conversation_type == "2": + # Group chat + resp = await client.post( + "https://api.dingtalk.com/v1.0/robot/groupMessages/send", + headers=headers, + json={ + "robotCode": app_key, + "openConversationId": target_id, + "msgKey": msg_key, + "msgParam": msg_param, + }, + ) + else: + # P2P chat + resp = await client.post( + "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend", + headers=headers, + json={ + "robotCode": app_key, + "userIds": [target_id], + "msgKey": msg_key, + "msgParam": msg_param, + }, + ) + + data = resp.json() + if resp.status_code >= 400 or data.get("errcode"): + logger.error(f"[DingTalk] Send media failed: {data}") + return False + + logger.info( + f"[DingTalk] Sent {media_type} message to {target_id[:16]}... " + f"(conv_type={conversation_type})" + ) + return True + except Exception as e: + logger.error(f"[DingTalk] Send media error: {e}") + return False + + +# ─── Stream Manager ───────────────────────────────────── + + +def _fire_and_forget(loop, coro): + """Schedule a coroutine on the main loop and log any unhandled exception.""" + future = asyncio.run_coroutine_threadsafe(coro, loop) + future.add_done_callback(lambda f: f.exception() if not f.cancelled() else None) class DingTalkStreamManager: @@ -67,47 +453,72 @@ def _run_client_thread( app_secret: str, stop_event: threading.Event, ): - """Run the DingTalk Stream client in a blocking thread.""" + """Run the DingTalk Stream client with auto-reconnect.""" try: import dingtalk_stream + except ImportError: + logger.warning( + "[DingTalk Stream] dingtalk-stream package not installed. " + "Install with: pip install dingtalk-stream" + ) + self._threads.pop(agent_id, None) + self._stop_events.pop(agent_id, None) + return - # Reference to manager's main loop for async dispatch - main_loop = self._main_loop - - class ClawithChatbotHandler(dingtalk_stream.ChatbotHandler): - """Custom handler that dispatches messages to the Clawith LLM pipeline.""" - - async def process(self, callback: dingtalk_stream.CallbackMessage): - """Handle incoming bot message from DingTalk Stream. - - NOTE: The SDK invokes this method in the thread's own asyncio loop, - so we must dispatch to the main FastAPI loop for DB + LLM work. - """ - try: - # Parse the raw data into a ChatbotMessage via class method - incoming = dingtalk_stream.ChatbotMessage.from_dict(callback.data) - - # Extract text content + MAX_RETRIES = 5 + RETRY_DELAYS = [2, 5, 15, 30, 60] # exponential backoff, seconds + + # Reference to manager's main loop for async dispatch + main_loop = self._main_loop + retries = 0 + manager_self = self + + class ClawithChatbotHandler(dingtalk_stream.ChatbotHandler): + """Custom handler that dispatches messages to the Clawith LLM pipeline.""" + + async def process(self, callback: dingtalk_stream.CallbackMessage): + """Handle incoming bot message from DingTalk Stream. + + NOTE: The SDK invokes this method in the thread's own asyncio loop, + so we must dispatch to the main FastAPI loop for DB + LLM work. + """ + try: + # Parse the raw data + incoming = dingtalk_stream.ChatbotMessage.from_dict(callback.data) + msg_data = callback.data if isinstance(callback.data, dict) else json.loads(callback.data) + + msgtype = msg_data.get("msgtype", "text") + sender_staff_id = incoming.sender_staff_id or incoming.sender_id or "" + sender_nick = incoming.sender_nick or "" + message_id = incoming.message_id or "" + conversation_id = incoming.conversation_id or "" + conversation_type = incoming.conversation_type or "1" + session_webhook = incoming.session_webhook or "" + + logger.info( + f"[DingTalk Stream] Received {msgtype} message from {sender_staff_id}" + ) + + if msgtype == "text": + # Plain text: use existing logic text_list = incoming.get_text_list() user_text = " ".join(text_list).strip() if text_list else "" - if not user_text: return dingtalk_stream.AckMessage.STATUS_OK, "empty message" - sender_staff_id = incoming.sender_staff_id or incoming.sender_id or "" - conversation_id = incoming.conversation_id or "" - conversation_type = incoming.conversation_type or "1" - session_webhook = incoming.session_webhook or "" - logger.info( - f"[DingTalk Stream] Message from [{incoming.sender_nick}]{sender_staff_id}: {user_text[:80]}" + f"[DingTalk Stream] Text from {sender_staff_id}: {user_text[:80]}" ) - # Dispatch to the main FastAPI event loop for DB + LLM processing from app.api.dingtalk import process_dingtalk_message if main_loop and main_loop.is_running(): - future = asyncio.run_coroutine_threadsafe( + # Add thinking reaction immediately + from app.services.dingtalk_reaction import add_thinking_reaction + _fire_and_forget(main_loop, + add_thinking_reaction(app_key, app_secret, message_id, conversation_id)) + + _fire_and_forget(main_loop, process_dingtalk_message( agent_id=agent_id, sender_staff_id=sender_staff_id, @@ -115,50 +526,134 @@ async def process(self, callback: dingtalk_stream.CallbackMessage): conversation_id=conversation_id, conversation_type=conversation_type, session_webhook=session_webhook, - ), - main_loop, - ) - # Wait for result (with timeout) - try: - future.result(timeout=120) - except Exception as e: - logger.error(f"[DingTalk Stream] LLM processing error: {e}") - import traceback - traceback.print_exc() + sender_nick=sender_nick, + message_id=message_id, + )) else: - logger.warning("[DingTalk Stream] Main loop not available for dispatch") - - return dingtalk_stream.AckMessage.STATUS_OK, "ok" - except Exception as e: - logger.error(f"[DingTalk Stream] Error in message handler: {e}") - import traceback - traceback.print_exc() - return dingtalk_stream.AckMessage.STATUS_SYSTEM_EXCEPTION, str(e) - - credential = dingtalk_stream.Credential(client_id=app_key, client_secret=app_secret) - client = dingtalk_stream.DingTalkStreamClient(credential=credential) - client.register_callback_handler( - dingtalk_stream.chatbot.ChatbotMessage.TOPIC, - ClawithChatbotHandler(), - ) + logger.warning("[DingTalk Stream] Main loop not available") - logger.info(f"[DingTalk Stream] Connecting for agent {agent_id}...") - # start_forever() blocks until disconnected - client.start_forever() + else: + # Non-text message: process media in the main loop + if main_loop and main_loop.is_running(): + # Add thinking reaction immediately + from app.services.dingtalk_reaction import add_thinking_reaction + _fire_and_forget(main_loop, + add_thinking_reaction(app_key, app_secret, message_id, conversation_id)) + + _fire_and_forget(main_loop, + manager_self._handle_media_and_dispatch( + msg_data=msg_data, + app_key=app_key, + app_secret=app_secret, + agent_id=agent_id, + sender_staff_id=sender_staff_id, + conversation_id=conversation_id, + conversation_type=conversation_type, + session_webhook=session_webhook, + sender_nick=sender_nick, + message_id=message_id, + )) + else: + logger.warning("[DingTalk Stream] Main loop not available") + + return dingtalk_stream.AckMessage.STATUS_OK, "ok" + except Exception as e: + logger.error(f"[DingTalk Stream] Error in message handler: {e}") + import traceback + traceback.print_exc() + return dingtalk_stream.AckMessage.STATUS_SYSTEM_EXCEPTION, str(e) + + while not stop_event.is_set() and retries <= MAX_RETRIES: + try: + credential = dingtalk_stream.Credential(client_id=app_key, client_secret=app_secret) + client = dingtalk_stream.DingTalkStreamClient(credential=credential) + client.register_callback_handler( + dingtalk_stream.chatbot.ChatbotMessage.TOPIC, + ClawithChatbotHandler(), + ) - except ImportError: - logger.warning( - "[DingTalk Stream] dingtalk-stream package not installed. " - "Install with: pip install dingtalk-stream" + logger.info( + f"[DingTalk Stream] Connecting for agent {agent_id}... " + f"(attempt {retries + 1}/{MAX_RETRIES + 1})" + ) + # start_forever() blocks until disconnected + client.start_forever() + + # start_forever returned: connection dropped + if stop_event.is_set(): + break # intentional stop, no retry + + # Reset retries on successful connection (ran for a while then disconnected) + retries = 0 + retries += 1 + logger.warning( + f"[DingTalk Stream] Connection lost for agent {agent_id}, will retry..." + ) + + except Exception as e: + retries += 1 + logger.error( + f"[DingTalk Stream] Connection error for {agent_id} " + f"(attempt {retries}/{MAX_RETRIES + 1}): {e}" + ) + + if retries > MAX_RETRIES: + logger.error( + f"[DingTalk Stream] Agent {agent_id} exhausted all {MAX_RETRIES} retries, giving up" + ) + break + + delay = RETRY_DELAYS[min(retries - 1, len(RETRY_DELAYS) - 1)] + logger.info( + f"[DingTalk Stream] Retrying in {delay}s for agent {agent_id}..." ) - except Exception as e: - logger.error(f"[DingTalk Stream] Client error for {agent_id}: {e}") - import traceback - traceback.print_exc() - finally: - self._threads.pop(agent_id, None) - self._stop_events.pop(agent_id, None) - logger.info(f"[DingTalk Stream] Client stopped for agent {agent_id}") + # Use stop_event.wait so we exit immediately if stopped + if stop_event.wait(timeout=delay): + break # stop was requested during wait + + self._threads.pop(agent_id, None) + self._stop_events.pop(agent_id, None) + logger.info(f"[DingTalk Stream] Client stopped for agent {agent_id}") + + @staticmethod + async def _handle_media_and_dispatch( + msg_data: dict, + app_key: str, + app_secret: str, + agent_id: uuid.UUID, + sender_staff_id: str, + conversation_id: str, + conversation_type: str, + session_webhook: str, + sender_nick: str = "", + message_id: str = "", + ): + """Download media, then dispatch to process_dingtalk_message.""" + from app.api.dingtalk import process_dingtalk_message + + user_text, image_base64_list, saved_file_paths = await _process_media_message( + msg_data=msg_data, + app_key=app_key, + app_secret=app_secret, + agent_id=agent_id, + ) + + if not user_text: + logger.info("[DingTalk Stream] Empty content after media processing, skipping") + return + + await process_dingtalk_message( + agent_id=agent_id, + sender_staff_id=sender_staff_id, + user_text=user_text, + conversation_id=conversation_id, + conversation_type=conversation_type, + session_webhook=session_webhook, + image_base64_list=image_base64_list, + saved_file_paths=saved_file_paths, + sender_nick=sender_nick, + message_id=message_id, + ) async def stop_client(self, agent_id: uuid.UUID): """Stop a running Stream client for an agent.""" @@ -167,7 +662,10 @@ async def stop_client(self, agent_id: uuid.UUID): stop_event.set() thread = self._threads.pop(agent_id, None) if thread and thread.is_alive(): - logger.info(f"[DingTalk Stream] Stopping client for agent {agent_id}") + logger.info(f"[DingTalk Stream] Stopping client for agent {agent_id}, waiting for thread...") + thread.join(timeout=5) + if thread.is_alive(): + logger.warning(f"[DingTalk Stream] Thread for {agent_id} did not exit within 5s") async def start_all(self): """Start Stream clients for all configured DingTalk agents.""" diff --git a/backend/app/services/dingtalk_token.py b/backend/app/services/dingtalk_token.py new file mode 100644 index 000000000..09b516822 --- /dev/null +++ b/backend/app/services/dingtalk_token.py @@ -0,0 +1,77 @@ +"""DingTalk access_token global cache manager. + +Caches tokens per app_key with auto-refresh before expiry. +All DingTalk token acquisition should go through this manager. +""" + +import time +import asyncio +from typing import Dict, Optional, Tuple +from loguru import logger +import httpx + + +class DingTalkTokenManager: + """Global DingTalk access_token cache. + + - Cache by app_key + - Token valid for 7200s, refresh 300s early + - Concurrency-safe with asyncio.Lock + """ + + def __init__(self): + self._cache: Dict[str, Tuple[str, float]] = {} + self._locks: Dict[str, asyncio.Lock] = {} + + def _get_lock(self, app_key: str) -> asyncio.Lock: + if app_key not in self._locks: + self._locks[app_key] = asyncio.Lock() + return self._locks[app_key] + + async def get_token(self, app_key: str, app_secret: str) -> Optional[str]: + """Get access_token, return cached if valid, refresh if expired.""" + if app_key in self._cache: + token, expires_at = self._cache[app_key] + if time.time() < expires_at - 300: + return token + + async with self._get_lock(app_key): + # Double-check after acquiring lock + if app_key in self._cache: + token, expires_at = self._cache[app_key] + if time.time() < expires_at - 300: + return token + + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post( + "https://api.dingtalk.com/v1.0/oauth2/accessToken", + json={"appKey": app_key, "appSecret": app_secret}, + ) + data = resp.json() + token = data.get("accessToken") + expires_in = data.get("expireIn", 7200) + + if token: + self._cache[app_key] = (token, time.time() + expires_in) + logger.debug(f"[DingTalk Token] Refreshed for {app_key[:8]}..., expires in {expires_in}s") + return token + + logger.error(f"[DingTalk Token] Failed to get token: {data}") + return None + except Exception as e: + logger.error(f"[DingTalk Token] Error getting token: {e}") + return None + + async def get_corp_token(self, app_key: str, app_secret: str) -> Optional[str]: + """Get corp access_token via oapi.dingtalk.com/gettoken (GET). + + Used for corp API calls like /topapi/v2/user/get. + Shares the same cache since the token works for both APIs. + """ + # The v1.0 OAuth2 token also works for corp APIs, so reuse it + return await self.get_token(app_key, app_secret) + + +# Global singleton +dingtalk_token_manager = DingTalkTokenManager() diff --git a/backend/app/services/org_sync_adapter.py b/backend/app/services/org_sync_adapter.py index 8a3525d13..8a37984eb 100644 --- a/backend/app/services/org_sync_adapter.py +++ b/backend/app/services/org_sync_adapter.py @@ -4,6 +4,7 @@ from various identity providers (Feishu, DingTalk, WeCom, etc.). """ +import asyncio import uuid from abc import ABC, abstractmethod from dataclasses import dataclass, field @@ -586,7 +587,6 @@ async def get_access_token(self) -> str: async def fetch_departments(self) -> list[ExternalDepartment]: """Fetch all departments from Feishu using concurrent recursive calls to get parent-child relationships.""" - import asyncio token = await self.get_access_token() all_depts: list[ExternalDepartment] = [] # Add a virtual root for the tenant, consistent with DingTalk root behavior @@ -810,6 +810,7 @@ async def fetch_departments(self) -> list[ExternalDepartment]: seen: set[int] = set() queue: list[int] = [1] # DingTalk root dept id + _request_count = 0 async with httpx.AsyncClient() as client: while queue: @@ -818,6 +819,12 @@ async def fetch_departments(self) -> list[ExternalDepartment]: continue seen.add(parent_id) + # DingTalk rate limit: ~20 QPS per app per interface. + # Sleep 60ms between requests to stay under the limit. + if _request_count > 0: + await asyncio.sleep(0.06) + _request_count += 1 + resp = await client.post( self.DINGTALK_DEPT_LIST_URL, params={"access_token": token}, @@ -875,6 +882,10 @@ async def fetch_users(self, department_external_id: str) -> list[ExternalUser]: async with httpx.AsyncClient() as client: while True: + # DingTalk rate limit: ~20 QPS per app per interface. + # Sleep 60ms between requests to stay under the limit. + await asyncio.sleep(0.06) + resp = await client.post( self.DINGTALK_USER_LIST_URL, params={"access_token": token}, diff --git a/backend/app/services/tool_seeder.py b/backend/app/services/tool_seeder.py index 5ac16d91a..7584ab0f0 100644 --- a/backend/app/services/tool_seeder.py +++ b/backend/app/services/tool_seeder.py @@ -272,7 +272,7 @@ { "name": "send_message_to_agent", "display_name": "Agent Message", - "description": "Send a message to a digital employee colleague and receive a reply. Suitable for questions, delegation, or collaboration.", + "description": "Send a message to a digital employee colleague. Decision guide: target needs to DO WORK and return results? → task_delegate. Just FYI? → notify. Quick factual question? → consult. When unsure, prefer task_delegate.", "category": "communication", "icon": "🤖", "is_default": True, @@ -281,9 +281,9 @@ "properties": { "agent_name": {"type": "string", "description": "Target agent name"}, "message": {"type": "string", "description": "Message content"}, - "msg_type": {"type": "string", "enum": ["chat", "task_request", "info_share"], "description": "Message type"}, + "msg_type": {"type": "string", "enum": ["notify", "consult", "task_delegate"], "description": "(1) Target needs to DO WORK and return results? → task_delegate. (2) Just FYI? → notify. (3) Quick factual question? → consult. When unsure, prefer task_delegate."}, }, - "required": ["agent_name", "message"], + "required": ["agent_name", "message", "msg_type"], }, "config": {}, "config_schema": {}, @@ -310,10 +310,10 @@ { "name": "web_search", "display_name": "Web Search", - "description": "Search the internet using a configurable search engine. Supports DuckDuckGo (free), Tavily, Google, and Bing. Configure the search engine in the tool settings.", + "description": "[Deprecated] Unified search tool with engine selector. Use the dedicated tools (DuckDuckGo Search, Tavily Search, Google Search, Bing Search, Exa Search) instead for better control per engine.", "category": "search", "icon": "🔍", - "is_default": True, + "is_default": False, "parameters_schema": { "type": "object", "properties": { @@ -339,6 +339,7 @@ {"value": "tavily", "label": "Tavily (AI search, needs API key)"}, {"value": "google", "label": "Google Custom Search (needs API key)"}, {"value": "bing", "label": "Bing Search API (needs API key)"}, + {"value": "exa", "label": "Exa (AI-powered search, needs API key)"}, ], "default": "duckduckgo", }, @@ -348,7 +349,7 @@ "type": "password", "default": "", "placeholder": "Required for engines that need an API key", - "depends_on": {"search_engine": ["tavily", "google", "bing"]}, + "depends_on": {"search_engine": ["tavily", "google", "bing", "exa"]}, }, { "key": "max_results", @@ -428,6 +429,186 @@ ] }, }, + { + "name": "exa_search", + "display_name": "Exa Search", + "description": "AI-powered web search using Exa (exa.ai). Supports semantic search, category filtering, domain filtering, and multiple content modes (text, highlights, summary). Requires an Exa API key.", + "category": "search", + "icon": "🔎", + "is_default": False, + "parameters_schema": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "Search query"}, + "max_results": {"type": "integer", "description": "Number of results (default 5, max 10)"}, + "search_type": { + "type": "string", + "description": "Search type: auto (default), neural, or fast", + "enum": ["auto", "neural", "fast"], + }, + "category": { + "type": "string", + "description": "Filter by category: company, research paper, news, personal site, financial report, or people", + }, + "include_domains": { + "type": "string", + "description": "Comma-separated domains to restrict results to (e.g. 'arxiv.org, github.com')", + }, + "exclude_domains": { + "type": "string", + "description": "Comma-separated domains to exclude from results", + }, + "content_mode": { + "type": "string", + "description": "Content retrieval mode: text (default), highlights, or summary", + "enum": ["text", "highlights", "summary"], + }, + }, + "required": ["query"], + }, + "config": {}, + "config_schema": { + "fields": [ + { + "key": "api_key", + "label": "Exa API Key", + "type": "password", + "default": "", + "placeholder": "Get your API key at exa.ai", + }, + ] + }, + }, + # ── Standalone search engines (each engine as its own tool) ────────────── + # These complement web_search (which remains for backward compatibility). + # Each tool wraps a single engine so agents can pick the right one for the + # task without going through the unified engine-selector flow. + { + "name": "duckduckgo_search", + "display_name": "DuckDuckGo Search", + "description": "Search the internet using DuckDuckGo. Free, no API key required. Returns titles, URLs, and snippets.", + "category": "search", + "icon": "🦆", + "is_default": False, + "parameters_schema": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "Search keywords"}, + "max_results": {"type": "integer", "description": "Number of results to return (default 5, max 10)"}, + }, + "required": ["query"], + }, + "config": {}, + "config_schema": {"fields": []}, + }, + { + "name": "tavily_search", + "display_name": "Tavily Search", + "description": "AI-optimized web search using Tavily. Returns high-quality results with summaries. Requires a Tavily API key.", + "category": "search", + "icon": "🔍", + "is_default": False, + "parameters_schema": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "Search keywords"}, + "max_results": {"type": "integer", "description": "Number of results to return (default 5, max 10)"}, + }, + "required": ["query"], + }, + "config": {}, + "config_schema": { + "fields": [ + { + "key": "api_key", + "label": "Tavily API Key", + "type": "password", + "default": "", + "placeholder": "tvly-xxxxxxxxxxxxxxxx (get one at tavily.com)", + }, + ] + }, + }, + { + "name": "google_search", + "display_name": "Google Search", + "description": "Search using Google Custom Search JSON API. Returns titles, URLs, and snippets. Requires a Google API key and Custom Search Engine ID (format: API_KEY:CX_ID).", + "category": "search", + "icon": "🔍", + "is_default": False, + "parameters_schema": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "Search keywords"}, + "max_results": {"type": "integer", "description": "Number of results to return (default 5, max 10)"}, + "language": {"type": "string", "description": "Search language code (e.g. 'en', 'zh')"}, + }, + "required": ["query"], + }, + "config": {"language": "en"}, + "config_schema": { + "fields": [ + { + "key": "api_key", + "label": "API Key & Search Engine ID", + "type": "password", + "default": "", + "placeholder": "API_KEY:SEARCH_ENGINE_ID (get at console.cloud.google.com)", + }, + { + "key": "language", + "label": "Search language", + "type": "select", + "options": [ + {"value": "en", "label": "English"}, + {"value": "zh-CN", "label": "Chinese"}, + {"value": "ja", "label": "Japanese"}, + ], + "default": "en", + }, + ] + }, + }, + { + "name": "bing_search", + "display_name": "Bing Search", + "description": "Search using Bing Web Search API. Returns titles, URLs, and snippets. Requires a Bing Search API key from Microsoft Azure.", + "category": "search", + "icon": "🔍", + "is_default": False, + "parameters_schema": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "Search keywords"}, + "max_results": {"type": "integer", "description": "Number of results to return (default 5, max 10)"}, + "language": {"type": "string", "description": "Market language code (e.g. 'en-US', 'zh-CN')"}, + }, + "required": ["query"], + }, + "config": {"language": "en-US"}, + "config_schema": { + "fields": [ + { + "key": "api_key", + "label": "Bing Search API Key", + "type": "password", + "default": "", + "placeholder": "Get from Azure Cognitive Services (Bing Search v7)", + }, + { + "key": "language", + "label": "Market language", + "type": "select", + "options": [ + {"value": "en-US", "label": "English (US)"}, + {"value": "zh-CN", "label": "Chinese (Simplified)"}, + {"value": "ja-JP", "label": "Japanese"}, + ], + "default": "en-US", + }, + ] + }, + }, { "name": "plaza_get_new_posts", "display_name": "Plaza: Browse", @@ -1901,6 +2082,7 @@ async def seed_builtin_tools(): category=t["category"], icon=t["icon"], is_default=t["is_default"], + parameters_schema=t.get("parameters_schema", {"type": "object", "properties": {}}), config=t.get("config", {}), config_schema=t.get("config_schema", {}), source="builtin", diff --git a/backend/app/services/trigger_daemon.py b/backend/app/services/trigger_daemon.py index be052566e..9cf164af7 100644 --- a/backend/app/services/trigger_daemon.py +++ b/backend/app/services/trigger_daemon.py @@ -30,9 +30,19 @@ MAX_AGENT_CHAIN_DEPTH = 5 # A→B→A→B→A max depth before stopping MIN_POLL_INTERVAL_MINUTES = 5 # minimum poll interval to prevent abuse -# Track last invocation time per agent to enforce dedup window _last_invoke: dict[uuid.UUID, datetime] = {} +_A2A_WAKE_CHAIN: dict[str, int] = {} +_A2A_WAKE_CHAIN_TTL = 300 +_A2A_MAX_WAKE_DEPTH = 3 + + +def _cleanup_stale_invoke_cache(): + now = datetime.now(timezone.utc) + stale = [k for k, v in _last_invoke.items() if (now - v).total_seconds() > DEDUP_WINDOW * 2] + for k in stale: + del _last_invoke[k] + # Webhook rate limiter: token -> list of timestamps _webhook_hits: dict[str, list[float]] = {} WEBHOOK_RATE_LIMIT = 5 # max hits per minute per token @@ -265,8 +275,9 @@ async def _check_new_agent_messages(trigger: AgentTrigger) -> bool: # --- Agent-to-agent message check (existing logic) --- from app.models.participant import Participant from app.models.agent import Agent as AgentModel + safe_agent_name = from_agent_name.replace("%", "").replace("_", r"\_") agent_r = await db.execute( - select(AgentModel).where(AgentModel.name.ilike(f"%{from_agent_name}%")) + select(AgentModel).where(AgentModel.name.ilike(f"%{safe_agent_name}%")) ) source_agent = agent_r.scalars().first() if not source_agent: @@ -313,13 +324,14 @@ async def _check_new_agent_messages(trigger: AgentTrigger) -> bool: # Look up user by display name or username within tenant from sqlalchemy import or_ from app.models.user import User, Identity + safe_user_name = from_user_name.replace("%", "").replace("_", r"\_") query = ( select(User) .join(User.identity) .where( or_( - User.display_name.ilike(f"%{from_user_name}%"), - Identity.username.ilike(f"%{from_user_name}%"), + User.display_name.ilike(f"%{safe_user_name}%"), + Identity.username.ilike(f"%{safe_user_name}%"), ) ) ) @@ -500,6 +512,8 @@ async def on_tool_call(data): except Exception as e: logger.warning(f"Failed to persist tool call for trigger session: {e}") + _is_a2a_wake = all(t.name == "a2a_wake" for t in triggers) + reply = await call_llm( model=model, messages=messages, @@ -509,6 +523,7 @@ async def on_tool_call(data): user_id=agent.creator_id, on_chunk=on_chunk, on_tool_call=on_tool_call, + max_tool_rounds_override=2 if _is_a2a_wake else None, ) # Save assistant reply to Reflection session @@ -535,14 +550,57 @@ async def on_tool_call(data): # Push trigger result to user's active WebSocket connections final_reply = reply or "".join(collected_content) - if final_reply: + + is_a2a_internal = all(t.name == "a2a_wake" for t in triggers) + + if final_reply and not is_a2a_internal: try: from app.api.websocket import manager as ws_manager agent_id_str = str(agent_id) # Build notification message with trigger badge - trigger_badge = ", ".join(trigger_names) - notification = f"⚡ **触发器触发** `{trigger_badge}`\n\n{final_reply}" + trigger_reasons = [] + for t in triggers: + ns = (t.config or {}).get("_notification_summary", "").strip() + if ns: + trigger_reasons.append(ns) + else: + r = (t.reason or "").strip() + if r and len(r) <= 80: + trigger_reasons.append(r) + elif r: + trigger_reasons.append(r[:77] + "...") + summary = trigger_reasons[0] if trigger_reasons else "有新的事件需要处理" + + _is_a2a_wait = any(t.name.startswith("a2a_wait_") for t in triggers) + if _is_a2a_wait: + import re as _re + cleaned = final_reply + _internal_patterns = [ + r'\b(a2a_wait_\w+|a2a_wake)\b', + r'\bwait_?\w+_?(task|reply|followup|meeting|sync|api_key)\w*\b', + r'\bresolve_\w+\b', + r'\bfocus[_ ]?item\b', + r'\btask_delegate\b', + r'\bfocus_ref\b', + r'✅\s*(a2a\w+|wait\w+|触发器\w*|focus\w*).*(?:已取消|已为|保持|活跃|完成状态)[^\n]*', + r'[\-•]\s*(?:触发器|trigger|focus|wait_\w+|a2a\w+).*[^\n]*', + r'(?:触发器|trigger)\s+\S+\s*(?:已取消|保持活跃|已为完成状态|fired)', + r'已静默清理触发器', + r'已静默处理完毕', + r'继续待命[。,]?\s*', + r',?\s*(?:继续)?待命。', + ] + for _pat in _internal_patterns: + cleaned = _re.sub(_pat, '', cleaned, flags=_re.IGNORECASE) + cleaned = _re.sub(r'\n{3,}', '\n\n', cleaned).strip() + cleaned = _re.sub(r'[。,]\s*$', '', cleaned).strip() + if not cleaned: + cleaned = final_reply + else: + cleaned = final_reply + + notification = f"⚡ {summary}\n\n{cleaned}" # Save to user's active chat session(s) for persistence async with async_session() as db: @@ -693,6 +751,72 @@ async def _tick(): asyncio.create_task(_invoke_agent_for_triggers(agent_id, agent_triggers)) +async def wake_agent_with_context(agent_id: uuid.UUID, message_context: str, *, from_agent_id: uuid.UUID | None = None, skip_dedup: bool = False) -> None: + """Public API: wake an agent asynchronously with a message context. + + Creates a synthetic trigger invocation so the agent processes the + message in a Reflection Session via the standard trigger path. + Safe to call from any async context. + + Args: + agent_id: The agent to wake. + message_context: The message to deliver. + from_agent_id: The agent that initiated this wake (for chain depth tracking). + skip_dedup: If True, bypass the dedup window check. Use this for + genuine message deliveries (e.g. a task_delegate callback) + where skipping the wake would lose a real message. + """ + import time as _time + + now = datetime.now(timezone.utc) + + if from_agent_id: + chain_key = f"{from_agent_id}->{agent_id}" + current_depth = _A2A_WAKE_CHAIN.get(chain_key, 0) + if current_depth >= _A2A_MAX_WAKE_DEPTH: + logger.warning( + f"[A2A] Wake chain depth {current_depth} reached for {chain_key}, " + f"stopping to prevent wake storm" + ) + return + + _A2A_WAKE_CHAIN[chain_key] = current_depth + 1 + + def _decay_chain(): + _A2A_WAKE_CHAIN.pop(chain_key, None) + asyncio.get_running_loop().call_later(_A2A_WAKE_CHAIN_TTL, _decay_chain) + + if not skip_dedup and agent_id in _last_invoke: + elapsed = (now - _last_invoke[agent_id]).total_seconds() + if elapsed < DEDUP_WINDOW: + logger.info( + f"[A2A] Skipping wake for agent {agent_id} — " + f"invoked {elapsed:.0f}s ago (dedup window {DEDUP_WINDOW}s)" + ) + return + + _last_invoke[agent_id] = now + + dummy_trigger = AgentTrigger( + id=uuid.uuid4(), + agent_id=agent_id, + name="a2a_wake", + type="on_message", + config={"from_agent_name": "", "_matched_message": message_context[:2000], "_matched_from": "agent"}, + reason=( + "You received a notification from another agent. " + "Read the message content above, update your focus and memory if needed, " + "and take any action you deem necessary. " + "Do NOT reply back to the sender unless you have a genuine question — " + "this was a notification, not a request for response." + ), + is_enabled=True, + last_fired_at=now, + fire_count=0, + ) + asyncio.create_task(_invoke_agent_for_triggers(agent_id, [dummy_trigger])) + + async def start_trigger_daemon(): """Start the background trigger daemon loop. Called from FastAPI startup.""" logger.info("⚡ Trigger Daemon started (15s tick, heartbeat every ~60s)") @@ -709,6 +833,7 @@ async def start_trigger_daemon(): _heartbeat_counter += 1 if _heartbeat_counter >= 4: _heartbeat_counter = 0 + _cleanup_stale_invoke_cache() try: from app.services.heartbeat import _heartbeat_tick await _heartbeat_tick() diff --git a/backend/tests/test_a2a_msg_type.py b/backend/tests/test_a2a_msg_type.py new file mode 100644 index 000000000..5fc87c7c4 --- /dev/null +++ b/backend/tests/test_a2a_msg_type.py @@ -0,0 +1,639 @@ +"""Tests for async A2A msg_type differentiation (notify/consult/task_delegate). + +Validates the branching logic in _send_message_to_agent: +- notify: fire-and-forget, returns immediately +- task_delegate: async with callback, creates focus + trigger +- consult: synchronous request-response (original behaviour) +""" + +import json +import uuid +from datetime import UTC, datetime +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +# ── Helpers ────────────────────────────────────────────────────────── + +class DummyResult: + def __init__(self, values=None, scalar_value=None, scalars_list=None): + self._values = list(values or []) + self._scalar_value = scalar_value + self._scalars_list = scalars_list + + def scalar_one_or_none(self): + if self._scalar_value is not None: + return self._scalar_value + return self._values[0] if self._values else None + + def scalars(self): + return self + + def all(self): + return list(self._scalars_list or self._values) + + def first(self): + if self._scalars_list: + return self._scalars_list[0] if self._scalars_list else None + return self._values[0] if self._values else None + + def scalar(self): + if self._scalar_value is not None: + return self._scalar_value + return self._values[0] if self._values else None + + +class RecordingDB: + def __init__(self, responses=None): + self.responses = list(responses or []) + self.added = [] + self.committed = False + self.flushed = False + + async def execute(self, _statement, _params=None): + if not self.responses: + raise AssertionError("unexpected execute() call") + return self.responses.pop(0) + + def add(self, value): + self.added.append(value) + + async def commit(self): + self.committed = True + + async def flush(self): + self.flushed = True + + +def _make_agent(agent_id=None, name="TestAgent", tenant_id=None, agent_type="native", + expired=False, primary_model_id=None): + agent = MagicMock() + agent.id = agent_id or uuid.uuid4() + agent.name = name + agent.tenant_id = tenant_id or uuid.uuid4() + agent.agent_type = agent_type + agent.is_expired = expired + agent.expires_at = None + agent.creator_id = uuid.uuid4() + agent.primary_model_id = primary_model_id + agent.fallback_model_id = None + agent.role_description = "" + agent.max_tool_rounds = 50 + return agent + + +def _make_participant(part_id=None, ref_id=None): + p = MagicMock() + p.id = part_id or uuid.uuid4() + p.type = "agent" + p.ref_id = ref_id or uuid.uuid4() + return p + + +def _make_tenant(a2a_async_enabled=True): + t = MagicMock() + t.a2a_async_enabled = a2a_async_enabled + return t + + +# ── Tests ──────────────────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_notify_returns_immediately(): + """notify msg_type should return immediately without calling LLM.""" + from app.services.agent_tools import _send_message_to_agent + + from_agent_id = uuid.uuid4() + target_id = uuid.uuid4() + rel_id = uuid.uuid4() + session_id = uuid.uuid4() + src_participant = _make_participant(ref_id=from_agent_id) + tgt_participant = _make_participant(ref_id=target_id) + source_agent = _make_agent(from_agent_id, name="Alice") + target_agent = _make_agent(target_id, name="Bob") + + session = MagicMock() + session.id = session_id + session.last_message_at = None + + db = RecordingDB(responses=[ + DummyResult(scalar_value=source_agent), + DummyResult(scalars_list=[target_agent]), + DummyResult(scalar_value=rel_id), + DummyResult(scalar_value=src_participant), + DummyResult(scalar_value=tgt_participant), + DummyResult(scalar_value=session), + DummyResult(scalar_value=_make_tenant()), + ]) + + with patch("app.services.agent_tools.async_session") as mock_session_ctx, \ + patch("app.services.agent_tools._wake_agent_async", new_callable=AsyncMock) as mock_wake: + + mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=db) + mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) + + result = await _send_message_to_agent(from_agent_id, { + "agent_name": "Bob", + "message": "Please review the document", + "msg_type": "notify", + }) + + assert "Notification sent to Bob" in result + assert "asynchronously" in result + mock_wake.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_task_delegate_creates_focus_and_trigger(): + """task_delegate should create a focus item and an on_message trigger.""" + from app.services.agent_tools import _send_message_to_agent + + from_agent_id = uuid.uuid4() + target_id = uuid.uuid4() + rel_id = uuid.uuid4() + session_id = uuid.uuid4() + src_participant = _make_participant(ref_id=from_agent_id) + tgt_participant = _make_participant(ref_id=target_id) + source_agent = _make_agent(from_agent_id, name="Alice") + target_agent = _make_agent(target_id, name="Bob") + + session = MagicMock() + session.id = session_id + session.last_message_at = None + + db = RecordingDB(responses=[ + DummyResult(scalar_value=source_agent), + DummyResult(scalars_list=[target_agent]), + DummyResult(scalar_value=rel_id), + DummyResult(scalar_value=src_participant), + DummyResult(scalar_value=tgt_participant), + DummyResult(scalar_value=session), + DummyResult(scalar_value=_make_tenant()), + ]) + + with patch("app.services.agent_tools.async_session") as mock_session_ctx, \ + patch("app.services.agent_tools._append_focus_item", new_callable=AsyncMock) as mock_focus, \ + patch("app.services.agent_tools._create_on_message_trigger", new_callable=AsyncMock) as mock_trigger, \ + patch("app.services.agent_tools._wake_agent_async", new_callable=AsyncMock) as mock_wake: + + mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=db) + mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) + + result = await _send_message_to_agent(from_agent_id, { + "agent_name": "Bob", + "message": "Please prepare the Q3 report", + "msg_type": "task_delegate", + }) + + assert "Task delegated to Bob" in result + assert "notified when they complete" in result + mock_focus.assert_awaited_once() + mock_trigger.assert_awaited_once() + mock_wake.assert_awaited_once() + + focus_call = mock_focus.call_args + assert "wait_bob_task" in focus_call[0][1] + assert "Bob" in focus_call[0][2] + + trigger_call = mock_trigger.call_args + assert trigger_call[1]["from_agent_name"] == "Bob" + assert trigger_call[1]["focus_ref"] == focus_call[0][1] + + +@pytest.mark.asyncio +async def test_consult_calls_llm_synchronously(): + """consult msg_type should call LLM synchronously and return reply.""" + from app.services.agent_tools import _send_message_to_agent + + from_agent_id = uuid.uuid4() + target_id = uuid.uuid4() + rel_id = uuid.uuid4() + session_id = uuid.uuid4() + model_id = uuid.uuid4() + src_participant = _make_participant(ref_id=from_agent_id) + tgt_participant = _make_participant(ref_id=target_id) + source_agent = _make_agent(from_agent_id, name="Alice") + target_agent = _make_agent(target_id, name="Bob", primary_model_id=model_id) + + session = MagicMock() + session.id = session_id + session.last_message_at = None + + model = MagicMock() + model.provider = "openai" + model.model = "gpt-4" + model.api_key_encrypted = "sk-test" + model.base_url = None + model.temperature = 0.7 + model.request_timeout = 60 + + response = MagicMock() + response.content = "Here is the answer" + response.tool_calls = None + response.usage = None + + mock_llm_client = AsyncMock() + mock_llm_client.complete = AsyncMock(return_value=response) + mock_llm_client.close = AsyncMock() + + db = RecordingDB(responses=[ + DummyResult(scalar_value=source_agent), + DummyResult(scalars_list=[target_agent]), + DummyResult(scalar_value=rel_id), + DummyResult(scalar_value=src_participant), + DummyResult(scalar_value=tgt_participant), + DummyResult(scalar_value=session), + DummyResult(scalar_value=_make_tenant()), + DummyResult(scalar_value=model), + DummyResult(scalars_list=[]), + ]) + + db2 = RecordingDB(responses=[ + DummyResult(scalar_value=tgt_participant), + ]) + + call_count = 0 + session_dbs = [db, db2] + + async def mock_session_enter(self): + nonlocal call_count + result = session_dbs[min(call_count, len(session_dbs) - 1)] + call_count += 1 + return result + + with patch("app.services.agent_tools.async_session") as mock_session_ctx, \ + patch("app.services.agent_context.build_agent_context", new_callable=AsyncMock, return_value=("static", "dynamic")), \ + patch("app.services.llm_utils.create_llm_client", return_value=mock_llm_client), \ + patch("app.services.agent_tools.get_agent_tools_for_llm", new_callable=AsyncMock, return_value=[]), \ + patch("app.services.llm_utils.get_provider_base_url", return_value="https://api.openai.com/v1"), \ + patch("app.services.token_tracker.record_token_usage", new_callable=AsyncMock), \ + patch("app.services.activity_logger.log_activity", new_callable=AsyncMock): + + mock_session_ctx.return_value.__aenter__ = AsyncMock(side_effect=[ + db, + db2, + ]) + mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) + + result = await _send_message_to_agent(from_agent_id, { + "agent_name": "Bob", + "message": "What is 2+2?", + "msg_type": "consult", + }) + + assert "Bob replied" in result + assert "Here is the answer" in result + mock_llm_client.complete.assert_awaited() + + +@pytest.mark.asyncio +async def test_default_msg_type_is_notify(): + """When msg_type is not specified, it should default to notify.""" + from app.services.agent_tools import _send_message_to_agent + + from_agent_id = uuid.uuid4() + target_id = uuid.uuid4() + rel_id = uuid.uuid4() + session_id = uuid.uuid4() + src_participant = _make_participant(ref_id=from_agent_id) + tgt_participant = _make_participant(ref_id=target_id) + source_agent = _make_agent(from_agent_id, name="Alice") + target_agent = _make_agent(target_id, name="Bob") + + session = MagicMock() + session.id = session_id + session.last_message_at = None + + db = RecordingDB(responses=[ + DummyResult(scalar_value=source_agent), + DummyResult(scalars_list=[target_agent]), + DummyResult(scalar_value=rel_id), + DummyResult(scalar_value=src_participant), + DummyResult(scalar_value=tgt_participant), + DummyResult(scalar_value=session), + DummyResult(scalar_value=_make_tenant()), + ]) + + with patch("app.services.agent_tools.async_session") as mock_session_ctx, \ + patch("app.services.agent_tools._wake_agent_async", new_callable=AsyncMock) as mock_wake: + + mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=db) + mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) + + result = await _send_message_to_agent(from_agent_id, { + "agent_name": "Bob", + "message": "Heads up about the meeting", + }) + + assert "Notification sent" in result + mock_wake.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_missing_agent_name_returns_error(): + """Missing agent_name should return an error.""" + from app.services.agent_tools import _send_message_to_agent + + result = await _send_message_to_agent(uuid.uuid4(), { + "agent_name": "", + "message": "Hello", + }) + + assert "❌" in result + + +@pytest.mark.asyncio +async def test_no_relationship_returns_error(): + """No relationship between agents should return an error.""" + from app.services.agent_tools import _send_message_to_agent + + from_agent_id = uuid.uuid4() + target_id = uuid.uuid4() + source_agent = _make_agent(from_agent_id, name="Alice") + target_agent = _make_agent(target_id, name="Bob") + src_participant = _make_participant(ref_id=from_agent_id) + tgt_participant = _make_participant(ref_id=target_id) + + db = RecordingDB(responses=[ + DummyResult(scalar_value=source_agent), + DummyResult(scalars_list=[target_agent]), + DummyResult(scalar_value=None), + DummyResult(scalar_value=src_participant), + DummyResult(scalar_value=tgt_participant), + ]) + + with patch("app.services.agent_tools.async_session") as mock_session_ctx: + mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=db) + mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) + + result = await _send_message_to_agent(from_agent_id, { + "agent_name": "Bob", + "message": "Hello", + "msg_type": "notify", + }) + + assert "do not have a relationship" in result + + +@pytest.mark.asyncio +async def test_append_focus_item_creates_file(tmp_path): + """_append_focus_item should create/append to focus.md.""" + from app.services.agent_tools import _append_focus_item, WORKSPACE_ROOT + + agent_id = uuid.uuid4() + with patch("app.services.agent_tools.WORKSPACE_ROOT", tmp_path): + await _append_focus_item(agent_id, "test_item", "Test description") + + focus_path = tmp_path / str(agent_id) / "focus.md" + assert focus_path.exists() + content = focus_path.read_text() + assert "test_item" in content + assert "Test description" in content + assert "- [ ]" in content + + +@pytest.mark.asyncio +async def test_append_focus_item_no_duplicate(tmp_path): + """_append_focus_item should not duplicate existing items.""" + from app.services.agent_tools import _append_focus_item + + agent_id = uuid.uuid4() + focus_path = tmp_path / str(agent_id) / "focus.md" + focus_path.parent.mkdir(parents=True, exist_ok=True) + focus_path.write_text("# Focus\n\n- [ ] test_item: Existing description\n") + + with patch("app.services.agent_tools.WORKSPACE_ROOT", tmp_path): + await _append_focus_item(agent_id, "test_item", "New description") + + content = focus_path.read_text() + assert content.count("test_item") == 1 + + +@pytest.mark.asyncio +async def test_create_on_message_trigger(): + """_create_on_message_trigger should create a trigger in DB.""" + from app.services.agent_tools import _create_on_message_trigger + + agent_id = uuid.uuid4() + + snap_db = RecordingDB(responses=[ + DummyResult(scalar_value=None), + ]) + trigger_db = RecordingDB(responses=[ + DummyResult(scalar_value=None), + ]) + + enter_count = 0 + dbs = [snap_db, trigger_db] + + async def _enter(): + nonlocal enter_count + db = dbs[min(enter_count, len(dbs) - 1)] + enter_count += 1 + return db + + with patch("app.services.agent_tools.async_session") as mock_session_ctx: + mock_session_ctx.return_value.__aenter__ = AsyncMock(side_effect=_enter) + mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) + + await _create_on_message_trigger( + agent_id=agent_id, + trigger_name="test_trigger", + from_agent_name="Bob", + reason="Test reason", + focus_ref="test_focus", + ) + + assert trigger_db.committed + assert len(trigger_db.added) == 1 + + trigger = trigger_db.added[0] + assert trigger.name == "test_trigger" + assert trigger.type == "on_message" + assert trigger.config["from_agent_name"] == "Bob" + assert trigger.reason == "Test reason" + assert trigger.focus_ref == "test_focus" + + +@pytest.mark.asyncio +async def test_wake_agent_async_calls_trigger_daemon(): + """_wake_agent_async should delegate to trigger_daemon.wake_agent_with_context.""" + from app.services.agent_tools import _wake_agent_async + + agent_id = uuid.uuid4() + context = "[From Alice] Hello Bob" + + with patch("app.services.trigger_daemon.wake_agent_with_context", new_callable=AsyncMock) as mock_wake: + await _wake_agent_async(agent_id, context) + mock_wake.assert_awaited_once_with(agent_id, context, from_agent_id=None, skip_dedup=False) + + +@pytest.mark.asyncio +async def test_openclaw_target_still_queues(): + """OpenClaw targets should still use the gateway queue regardless of msg_type.""" + from app.services.agent_tools import _send_message_to_agent + + from_agent_id = uuid.uuid4() + target_id = uuid.uuid4() + rel_id = uuid.uuid4() + session_id = uuid.uuid4() + src_participant = _make_participant(ref_id=from_agent_id) + tgt_participant = _make_participant(ref_id=target_id) + source_agent = _make_agent(from_agent_id, name="Alice") + target_agent = _make_agent(target_id, name="OpenClawBot", agent_type="openclaw") + target_agent.openclaw_last_seen = datetime.now(UTC) + + session = MagicMock() + session.id = session_id + session.last_message_at = None + + db = RecordingDB(responses=[ + DummyResult(scalar_value=source_agent), + DummyResult(scalars_list=[target_agent]), + DummyResult(scalar_value=rel_id), + DummyResult(scalar_value=src_participant), + DummyResult(scalar_value=tgt_participant), + DummyResult(scalar_value=session), + ]) + + with patch("app.services.agent_tools.async_session") as mock_session_ctx, \ + patch("app.services.activity_logger.log_activity", new_callable=AsyncMock): + + mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=db) + mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) + + result = await _send_message_to_agent(from_agent_id, { + "agent_name": "OpenClawBot", + "message": "Hello", + "msg_type": "notify", + }) + + assert "OpenClaw agent" in result + assert "queued" in result + + +@pytest.mark.asyncio +async def test_feature_flag_off_falls_back_to_consult(): + """When tenant a2a_async_enabled=False, notify and task_delegate fall back to consult.""" + from app.services.agent_tools import _send_message_to_agent + + from_agent_id = uuid.uuid4() + target_id = uuid.uuid4() + model_id = uuid.uuid4() + rel_id = uuid.uuid4() + session_id = uuid.uuid4() + src_participant = _make_participant(ref_id=from_agent_id) + tgt_participant = _make_participant(ref_id=target_id) + source_agent = _make_agent(from_agent_id, name="Alice") + source_agent.tenant_id = uuid.uuid4() + target_agent = _make_agent(target_id, name="Bob", primary_model_id=model_id) + + tenant = MagicMock() + tenant.a2a_async_enabled = False + + session = MagicMock() + session.id = session_id + session.last_message_at = None + + model = MagicMock() + model.provider = "openai" + model.model = "gpt-4" + model.api_key_encrypted = "sk-test" + model.base_url = None + model.temperature = 0.7 + model.request_timeout = 60 + + response = MagicMock() + response.content = "Got it" + response.tool_calls = None + response.usage = None + + mock_llm_client = AsyncMock() + mock_llm_client.complete = AsyncMock(return_value=response) + mock_llm_client.close = AsyncMock() + + db = RecordingDB(responses=[ + DummyResult(scalar_value=source_agent), + DummyResult(scalars_list=[target_agent]), + DummyResult(scalar_value=rel_id), + DummyResult(scalar_value=src_participant), + DummyResult(scalar_value=tgt_participant), + DummyResult(scalar_value=session), + DummyResult(scalar_value=tenant), + DummyResult(scalar_value=model), + DummyResult(scalars_list=[]), + ]) + + db2 = RecordingDB(responses=[ + DummyResult(scalar_value=tgt_participant), + ]) + + with patch("app.services.agent_tools.async_session") as mock_session_ctx, \ + patch("app.services.agent_context.build_agent_context", new_callable=AsyncMock, return_value=("s", "d")), \ + patch("app.services.llm_utils.create_llm_client", return_value=mock_llm_client), \ + patch("app.services.agent_tools.get_agent_tools_for_llm", new_callable=AsyncMock, return_value=[]), \ + patch("app.services.llm_utils.get_provider_base_url", return_value="https://api.openai.com/v1"), \ + patch("app.services.token_tracker.record_token_usage", new_callable=AsyncMock), \ + patch("app.services.activity_logger.log_activity", new_callable=AsyncMock): + + mock_session_ctx.return_value.__aenter__ = AsyncMock(side_effect=[db, db2]) + mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) + + result = await _send_message_to_agent(from_agent_id, { + "agent_name": "Bob", + "message": "Hello", + "msg_type": "notify", + }) + + assert "Bob replied" in result + assert "Got it" in result + + +@pytest.mark.asyncio +async def test_feature_flag_on_uses_notify(): + """When tenant a2a_async_enabled=True, notify works normally.""" + from app.services.agent_tools import _send_message_to_agent + + from_agent_id = uuid.uuid4() + target_id = uuid.uuid4() + rel_id = uuid.uuid4() + session_id = uuid.uuid4() + src_participant = _make_participant(ref_id=from_agent_id) + tgt_participant = _make_participant(ref_id=target_id) + source_agent = _make_agent(from_agent_id, name="Alice") + source_agent.tenant_id = uuid.uuid4() + target_agent = _make_agent(target_id, name="Bob") + + tenant = MagicMock() + tenant.a2a_async_enabled = True + + session = MagicMock() + session.id = session_id + session.last_message_at = None + + db = RecordingDB(responses=[ + DummyResult(scalar_value=source_agent), + DummyResult(scalars_list=[target_agent]), + DummyResult(scalar_value=rel_id), + DummyResult(scalar_value=src_participant), + DummyResult(scalar_value=tgt_participant), + DummyResult(scalar_value=session), + DummyResult(scalar_value=tenant), + ]) + + with patch("app.services.agent_tools.async_session") as mock_session_ctx, \ + patch("app.services.agent_tools._wake_agent_async", new_callable=AsyncMock) as mock_wake: + + mock_session_ctx.return_value.__aenter__ = AsyncMock(return_value=db) + mock_session_ctx.return_value.__aexit__ = AsyncMock(return_value=False) + + result = await _send_message_to_agent(from_agent_id, { + "agent_name": "Bob", + "message": "Hello", + "msg_type": "notify", + }) + + assert "Notification sent" in result + mock_wake.assert_awaited_once() diff --git a/frontend/VERSION b/frontend/VERSION index 53adb84c8..8f1278c4f 100644 --- a/frontend/VERSION +++ b/frontend/VERSION @@ -1 +1 @@ -1.8.2 +1.8.3-beta diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index b7ca3aea8..c9288aa49 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -12,7 +12,6 @@ import Dashboard from './pages/Dashboard'; import Plaza from './pages/Plaza'; import AgentDetail from './pages/AgentDetail'; import AgentCreate from './pages/AgentCreate'; -import Chat from './pages/Chat'; import Messages from './pages/Messages'; import EnterpriseSettings from './pages/EnterpriseSettings'; import InvitationCodes from './pages/InvitationCodes'; @@ -188,7 +187,8 @@ export default function App() { } /> } /> } /> - } /> + {/* NOTE: Chat is a tab inside AgentDetail (#chat), not a separate route. + The deprecated /agents/:id/chat path is intentionally removed. */} } /> } /> } /> diff --git a/frontend/src/components/FileBrowser.tsx b/frontend/src/components/FileBrowser.tsx index 9d852d7a0..b74b13d4b 100644 --- a/frontend/src/components/FileBrowser.tsx +++ b/frontend/src/components/FileBrowser.tsx @@ -7,6 +7,7 @@ import { useState, useEffect, useCallback, useRef } from 'react'; import { useTranslation } from 'react-i18next'; import MarkdownRenderer from './MarkdownRenderer'; +import { useDropZone } from '../hooks/useDropZone'; // ─── Types ───────────────────────────────────────────── @@ -103,6 +104,8 @@ export default function FileBrowser({ const [uploadProgress, setUploadProgress] = useState<{ fileName: string; percent: number } | null>(null); const textareaRef = useRef(null); + + // Auto-resize textarea to match content height useEffect(() => { const el = textareaRef.current; @@ -119,7 +122,7 @@ export default function FileBrowser({ setTimeout(() => setToast(null), 3000); }, []); - // ─── Load files ─────────────────────────────────── + const reload = useCallback(async () => { if (singleFile) { @@ -146,6 +149,32 @@ export default function FileBrowser({ setLoading(false); }, [api, currentPath, singleFile, fileFilter]); + // ─── Drag-and-drop upload ───────────────────── + const handleDroppedFiles = useCallback(async (files: File[]) => { + if (!api.upload || files.length === 0) return; + try { + for (const file of files) { + setUploadProgress({ fileName: file.name, percent: 0 }); + await api.upload(file, currentPath, (pct) => { + setUploadProgress({ fileName: file.name, percent: pct }); + }); + } + setUploadProgress(null); + reload(); + onRefresh?.(); + showToast(t('agent.upload.success', 'Upload successful')); + } catch (err: any) { + setUploadProgress(null); + showToast(t('agent.upload.failed', 'Upload failed') + ': ' + (err.message || ''), 'error'); + } + }, [api, currentPath, reload, onRefresh, showToast, t]); + + const { isDragging, dropZoneProps } = useDropZone({ + onDrop: handleDroppedFiles, + disabled: !upload || !api.upload || !!singleFile || !!viewing || readOnly, + accept: uploadAccept, + }); + useEffect(() => { reload(); }, [reload]); // ─── Load file content when viewing ─────────────── @@ -468,7 +497,15 @@ export default function FileBrowser({ // FILE LIST / BROWSER MODE // ═══════════════════════════════════════════════════ return ( -
+
+ {/* Drop overlay */} + {isDragging && ( +
+
+
{t('agent.workspace.dragOrClick', 'Drop files to upload')}
+
+ )} + {/* Toolbar */}
{title &&

{title}

} @@ -514,7 +551,9 @@ export default function FileBrowser({
) : files.length === 0 ? (
- {t('common.noData')} + {upload && api.upload + ? t('agent.workspace.dragOrClick', 'Drop files here or click Upload') + : t('common.noData')}
) : (
diff --git a/frontend/src/hooks/useDropZone.ts b/frontend/src/hooks/useDropZone.ts new file mode 100644 index 000000000..630dfbe41 --- /dev/null +++ b/frontend/src/hooks/useDropZone.ts @@ -0,0 +1,121 @@ +/** + * useDropZone — reusable drag-and-drop file upload hook. + * + * Uses a counter-based approach to handle nested elements correctly: + * dragenter/dragleave fire on every child element, so a simple boolean + * would flicker. The counter increments on dragenter and decrements on + * dragleave; isDragging is true when counter > 0. + */ +import { useState, useRef, useCallback, type DragEvent } from 'react'; + +export interface UseDropZoneOptions { + /** Callback when files are dropped. Receives the filtered file list. */ + onDrop: (files: File[]) => void; + /** When true, the drop zone is inactive (no visual feedback, drops ignored). */ + disabled?: boolean; + /** + * Optional comma-separated list of accepted MIME types or extensions. + * e.g. ".json" or "image/*,.pdf" + * Files not matching are silently filtered out. + */ + accept?: string; +} + +export interface UseDropZoneReturn { + /** True when a drag-with-files is hovering over the zone. */ + isDragging: boolean; + /** Spread these onto the container element acting as the drop zone. */ + dropZoneProps: { + onDragEnter: (e: DragEvent) => void; + onDragOver: (e: DragEvent) => void; + onDragLeave: (e: DragEvent) => void; + onDrop: (e: DragEvent) => void; + }; +} + +/** Check whether a drag event contains files (vs plain text / URLs). */ +function hasFiles(e: DragEvent): boolean { + if (e.dataTransfer?.types) { + for (const t of Array.from(e.dataTransfer.types)) { + if (t === 'Files') return true; + } + } + return false; +} + +/** Filter a FileList by an accept string (same format as ). */ +function filterFiles(files: FileList, accept?: string): File[] { + const list = Array.from(files); + if (!accept) return list; + + const tokens = accept.split(',').map(t => t.trim().toLowerCase()); + + return list.filter(file => { + const ext = '.' + (file.name.split('.').pop() || '').toLowerCase(); + const mime = file.type.toLowerCase(); + + return tokens.some(token => { + if (token.startsWith('.')) return ext === token; + if (token.endsWith('/*')) return mime.startsWith(token.slice(0, -1)); + return mime === token; + }); + }); +} + +export function useDropZone({ onDrop, disabled = false, accept }: UseDropZoneOptions): UseDropZoneReturn { + const [isDragging, setIsDragging] = useState(false); + const counterRef = useRef(0); + + const handleDragEnter = useCallback((e: DragEvent) => { + e.preventDefault(); + e.stopPropagation(); + if (disabled || !hasFiles(e)) return; + counterRef.current += 1; + if (counterRef.current === 1) setIsDragging(true); + }, [disabled]); + + const handleDragOver = useCallback((e: DragEvent) => { + e.preventDefault(); + e.stopPropagation(); + if (!disabled && hasFiles(e)) { + e.dataTransfer.dropEffect = 'copy'; + } + }, [disabled]); + + const handleDragLeave = useCallback((e: DragEvent) => { + e.preventDefault(); + e.stopPropagation(); + if (disabled) return; + counterRef.current -= 1; + if (counterRef.current <= 0) { + counterRef.current = 0; + setIsDragging(false); + } + }, [disabled]); + + const handleDrop = useCallback((e: DragEvent) => { + e.preventDefault(); + e.stopPropagation(); + counterRef.current = 0; + setIsDragging(false); + if (disabled) return; + + const rawFiles = e.dataTransfer?.files; + if (!rawFiles || rawFiles.length === 0) return; + + const filtered = filterFiles(rawFiles, accept); + if (filtered.length > 0) { + onDrop(filtered); + } + }, [disabled, accept, onDrop]); + + return { + isDragging, + dropZoneProps: { + onDragEnter: handleDragEnter, + onDragOver: handleDragOver, + onDragLeave: handleDragLeave, + onDrop: handleDrop, + }, + }; +} diff --git a/frontend/src/i18n/en.json b/frontend/src/i18n/en.json index a5be3fd99..adcb5f37f 100644 --- a/frontend/src/i18n/en.json +++ b/frontend/src/i18n/en.json @@ -911,6 +911,15 @@ "invites": "Invitation Codes", "identity": "OA Management" }, + "a2aAsync": { + "title": "Agent-to-Agent Async Communication", + "description": "Enable agents to communicate asynchronously with three modes: notify (one-way announcement), task_delegate (delegate work and get results back), and consult (synchronous question). When disabled, all agent-to-agent messages use synchronous consult mode — the same behavior as before this feature was introduced.", + "enabled": "Enabled", + "disabled": "Disabled", + "enabledHint": "Agents can use notify, task_delegate, and consult modes.", + "disabledHint": "All agent messages use synchronous consult mode.", + "enableWarning": "⚠️ You are about to enable the A2A Async Communication feature (Beta).\n\nThis feature allows agents to communicate asynchronously via notify and task_delegate modes.\n\nKnown potential issues:\n• Agent replies may contain internal technical terms (trigger names, focus items, etc.)\n• task_delegate callbacks may occasionally be delayed or dropped due to rate limiting\n• Token consumption will increase because each async message triggers a separate agent session\n• Agent loops may occur if triggers are not properly configured\n\nIf you encounter any issues, please return to this page and disable the toggle to restore stable synchronous behavior.\n\nAre you sure you want to enable this feature?" + }, "invites": { "pageTitle": "Invitation Codes", "pageDesc": "Manage invitation codes for platform registration.", diff --git a/frontend/src/i18n/zh.json b/frontend/src/i18n/zh.json index ee020b0e1..d3fd7c8f0 100644 --- a/frontend/src/i18n/zh.json +++ b/frontend/src/i18n/zh.json @@ -1060,13 +1060,22 @@ "audit": "审计日志", "config": "平台配置", "kb": "公司知识库", - "approvals": "审批", - "skills": "技能管理", + "approvals": "审批流程", + "skills": "技能", "quotas": "配额", - "users": "用户", + "users": "用户管理", "invites": "邀请码", "identity": "OA管理" }, + "a2aAsync": { + "title": "数字员工间异步通信", + "description": "允许数字员工之间以三种模式通信:notify(单向通知,无需回复)、task_delegate(委派任务并等待结果返回)、consult(同步问答)。关闭后,所有数字员工间的消息将使用同步 consult 模式——即此功能上线前的原有行为。", + "enabled": "已开启", + "disabled": "已关闭", + "enabledHint": "数字员工可使用 notify、task_delegate 和 consult 三种通信模式。", + "disabledHint": "所有数字员工消息使用同步 consult 模式。", + "enableWarning": "⚠️ 您即将开启数字员工间异步通信功能(测试版)。\n\n此功能允许数字员工通过 notify 和 task_delegate 模式进行异步通信。\n\n已知可能的问题:\n• 数字员工的回复中可能包含内部技术术语(触发器名称、关注项等)\n• task_delegate 回调可能因速率限制而偶尔延迟或丢失\n• Token 消耗将增加,因为每条异步消息都会触发一个独立的数字员工会话\n• 如果触发器配置不当,可能出现数字员工循环调用\n\n如遇到任何问题,请返回此页面关闭开关,即可恢复稳定的同步通信行为。\n\n确定要开启此功能吗?" + }, "invites": { "pageTitle": "邀请码", "pageDesc": "管理平台注册的邀请码。", diff --git a/frontend/src/index.css b/frontend/src/index.css index 1cced4e4c..1e1a199ac 100644 --- a/frontend/src/index.css +++ b/frontend/src/index.css @@ -26,6 +26,12 @@ --accent-subtle: rgba(225, 225, 232, 0.08); --accent-text: #c0c0cc; + --segment-active-bg: var(--accent-primary); + --segment-active-text: var(--text-inverse); + + /* Select chevron — @tabler/icons chevron-down, colored to --text-secondary */ + --select-chevron: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='24' height='24' viewBox='0 0 24 24' fill='none' stroke='%238b8b9e' stroke-width='2' stroke-linecap='round' stroke-linejoin='round'%3E%3Cpath stroke='none' d='M0 0h24v24H0z' fill='none'/%3E%3Cpath d='M6 9l6 6l6 -6'/%3E%3C/svg%3E"); + --success: #22c55e; --success-subtle: rgba(34, 197, 94, 0.12); --warning: #f59e0b; @@ -195,6 +201,12 @@ --accent-subtle: rgba(58, 58, 66, 0.08); --accent-text: #3a3a42; + --segment-active-bg: var(--bg-elevated); + --segment-active-text: var(--text-primary); + + /* Select chevron — @tabler/icons chevron-down, colored to --text-secondary */ + --select-chevron: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='24' height='24' viewBox='0 0 24 24' fill='none' stroke='%236b6b80' stroke-width='2' stroke-linecap='round' stroke-linejoin='round'%3E%3Cpath stroke='none' d='M0 0h24v24H0z' fill='none'/%3E%3Cpath d='M6 9l6 6l6 -6'/%3E%3C/svg%3E"); + --success: #16a34a; --success-subtle: rgba(22, 163, 74, 0.08); --warning: #d97706; @@ -279,8 +291,7 @@ button { } input, -textarea, -select { +textarea { font-family: inherit; font-size: inherit; color: inherit; @@ -292,6 +303,26 @@ select { transition: border-color var(--transition-fast); } +select { + font-family: inherit; + font-size: inherit; + color: inherit; + background-color: var(--bg-elevated); + background-image: var(--select-chevron); + background-repeat: no-repeat; + background-position: right 10px center; + background-size: 16px; + border: 1px solid var(--border-default); + border-radius: var(--radius-md); + padding: 4px 32px 4px 12px; + outline: none; + transition: border-color var(--transition-fast); + appearance: none; + -webkit-appearance: none; + cursor: pointer; + text-overflow: ellipsis; +} + /* Theme-colored radio and checkbox */ input[type="radio"], input[type="checkbox"] { @@ -500,21 +531,21 @@ select:focus { opacity: 1; } -/* Pinned icon swap: show pin by default, unpin on hover */ +/* Pinned icon swap: show pin by default, unpin only when hovering the button itself */ .sidebar-pin-btn.pinned .pin-hover { display: none; } -.sidebar-agent-item:hover .sidebar-pin-btn.pinned .pin-default { +.sidebar-pin-btn.pinned:hover .pin-default { display: none; } -.sidebar-agent-item:hover .sidebar-pin-btn.pinned .pin-hover { +.sidebar-pin-btn.pinned:hover .pin-hover { display: inline; } -/* When pinned and hovered, turn red to indicate unpin */ -.sidebar-agent-item:hover .sidebar-pin-btn.pinned { +/* When hovering the pin button itself on a pinned agent, turn red to indicate unpin */ +.sidebar-pin-btn.pinned:hover { color: var(--error); background: var(--error-subtle); } @@ -684,6 +715,18 @@ select:focus { transition: all var(--transition-default); } +/* Chat page needs a fixed viewport height so the inner chat-messages + can scroll independently. Without this, .main-content grows with content + (min-height: 100vh) and overflow-y: auto on .chat-messages never fires. */ +.main-content.chat-page { + height: 100vh; + min-height: 0; + overflow: hidden; + display: flex; + flex-direction: column; + padding: 0; +} + /* ─── Collapsed Sidebar Overrides ───────────────────── */ .sidebar.collapsed { --sidebar-width: var(--sidebar-width-collapsed); @@ -1044,23 +1087,100 @@ select:focus { border-bottom-color: var(--accent-primary); } -/* Session sidebar admin tabs — same underline pattern as .tabs/.tab, compact for narrow column */ -.tabs.session-sidebar-session-tabs { - gap: var(--space-5); - margin-bottom: var(--space-2); - margin-top: 0; - padding: 2px 12px 0; - position: relative; - background: transparent; - z-index: auto; - top: auto; +/* Session sidebar admin tabs — segment control style */ +.session-sidebar-segment-control { + display: flex; + margin: 0 12px 8px; + padding: 2px; + height: 28px; + background: var(--accent-subtle); + border-radius: 6px; + border: 1px solid var(--border-subtle); + box-sizing: border-box; } -.tabs.session-sidebar-session-tabs .tab { - flex: 0 1 auto; - padding: var(--space-2) var(--space-1) 10px; +.session-sidebar-segment-control .segment-item { + flex: 1; + display: flex; + align-items: center; + justify-content: center; font-size: 12px; + color: var(--text-tertiary); + border-radius: 4px; + cursor: pointer; + transition: all 0.15s ease; white-space: nowrap; + user-select: none; + border: none; + background: none; + padding: 0; + line-height: 1; +} + +.session-sidebar-segment-control .segment-item:hover { + color: var(--text-secondary); +} + +.session-sidebar-segment-control .segment-item.active { + background: var(--segment-active-bg); + color: var(--segment-active-text); + font-weight: 500; +} + +/* New session button */ +.new-session-btn { + width: 100%; + height: 28px; + padding: 0 10px; + background: none; + border: 1px solid var(--border-subtle); + border-radius: 6px; + cursor: pointer; + font-size: 12px; + color: var(--text-secondary); + display: flex; + align-items: center; + justify-content: center; + gap: 6px; + transition: all 0.15s ease; + box-sizing: border-box; + line-height: 1; +} + +.new-session-btn:hover { + background: var(--bg-secondary); + color: var(--text-primary); + border-color: var(--accent-primary); +} + +/* Session item — delete button & message count hover behaviour */ +.session-del-btn { + display: none; + align-items: center; + justify-content: center; + width: 24px; + height: 24px; + flex-shrink: 0; + padding: 0; + border: none; + border-radius: 4px; + background: none; + color: var(--text-tertiary); + cursor: pointer; + transition: color 0.35s ease-in-out, background 0.35s ease-in-out; +} + +.session-item:hover .session-del-btn { + display: flex; +} + +.session-del-btn:hover { + color: var(--status-error); + background: var(--bg-hover); +} + +.session-item:hover .session-msg-count { + display: none; } /* Page header */ @@ -1071,6 +1191,14 @@ select:focus { margin-bottom: var(--space-6); } +/* When inside the chat page (main-content.chat-page removes outer padding), + give page-header its own padding to replicate the normal main-content spacing */ +.main-content.chat-page .page-header { + padding: var(--space-8) var(--space-8) 0; + flex-shrink: 0; +} + + .page-title { font-size: var(--text-2xl); font-weight: 600; @@ -1345,7 +1473,7 @@ select:focus { .login-field input, .login-field select { height: 44px; - padding: 0 14px; + padding: 0 32px 0 14px; background: var(--bg-secondary); border: 1px solid var(--border-default); border-radius: 10px; @@ -1578,9 +1706,20 @@ select:focus { .chat-container { display: flex; flex-direction: column; - height: calc(100vh - 100px); + /* Fill the chat-page main-content area (which is padding: 0, height: 100vh). + The page-header inside Chat is ~68px; flex: 1 + min-height: 0 lets + .chat-messages own the remaining space and scroll independently. */ + flex: 1; + min-height: 0; + overflow: hidden; } +/* Restore horizontal padding that main-content normally provides */ +.main-content.chat-page .chat-container { + padding: 0 var(--space-8); +} + + /* When live panel is active, switch to horizontal layout */ .chat-container.chat-with-live-panel { flex-direction: row; @@ -2441,3 +2580,66 @@ select:focus { .login-qr-back:hover { text-decoration: underline; } + +/* ─── Drop Zone Overlay (drag-and-drop upload) ───── */ + +.drop-zone-wrapper { + position: relative; +} + +.drop-zone-overlay { + position: absolute; + inset: 0; + z-index: 100; + display: flex; + align-items: center; + justify-content: center; + flex-direction: column; + gap: 8px; + background: rgba(99, 102, 241, 0.07); + border: 2px dashed rgba(99, 102, 241, 0.45); + border-radius: 12px; + pointer-events: none; + animation: dropZoneFadeIn 0.15s ease; + backdrop-filter: blur(2px); +} + +.drop-zone-overlay__icon { + width: 40px; + height: 40px; + border-radius: 50%; + background: rgba(99, 102, 241, 0.12); + display: flex; + align-items: center; + justify-content: center; + font-size: 20px; + color: var(--accent-primary); +} + +.drop-zone-overlay__text { + font-size: 13px; + font-weight: 600; + color: var(--accent-primary); + letter-spacing: 0.2px; +} + +.drop-zone-overlay__hint { + font-size: 11px; + color: var(--text-tertiary); +} + +@keyframes dropZoneFadeIn { + from { + opacity: 0; + border-color: transparent; + } + to { + opacity: 1; + border-color: rgba(99, 102, 241, 0.45); + } +} + +[data-theme="light"] .drop-zone-overlay { + background: rgba(99, 102, 241, 0.06); + border-color: rgba(99, 102, 241, 0.35); +} diff --git a/frontend/src/pages/AgentDetail.tsx b/frontend/src/pages/AgentDetail.tsx index 1a253f592..5c2168dbb 100644 --- a/frontend/src/pages/AgentDetail.tsx +++ b/frontend/src/pages/AgentDetail.tsx @@ -1,4 +1,4 @@ -import React, { useState, useEffect, useMemo, useRef, Component, ErrorInfo } from 'react'; +import React, { useState, useEffect, useMemo, useRef, useCallback, Component, ErrorInfo } from 'react'; import { useParams, useNavigate, useLocation } from 'react-router-dom'; import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query'; import { useTranslation } from 'react-i18next'; @@ -18,6 +18,7 @@ import { useAuthStore } from '../stores'; import { copyToClipboard } from '../utils/clipboard'; import { formatFileSize } from '../utils/formatFileSize'; import { IconPaperclip, IconSend } from '@tabler/icons-react'; +import { useDropZone } from '../hooks/useDropZone'; const TABS = ['status', 'aware', 'mind', 'tools', 'skills', 'relationships', 'workspace', 'chat', 'activityLog', 'approvals', 'settings'] as const; @@ -1515,10 +1516,14 @@ function AgentDetailInner() { const isViewingOtherUsersSessions = canViewAllAgentChatSessions && chatScope === 'all'; - /** Sessions in scope=all that are not the current viewer's own P2P rows (for admin「其他用户」tab). */ + /** Sessions in scope=all that are not the current viewer's own P2P rows (for admin「其他用户」tab). + * Agent-to-agent sessions (source_channel === 'agent') store the creator's user_id, so we must + * exempt them from the user_id check — otherwise they'd always be hidden. */ const otherUsersSessions = useMemo(() => { const vu = viewerUserIdStr(); return allSessions.filter((s: any) => { + // Always show agent-to-agent sessions in the "Other users" tab + if (String(s.source_channel || '').toLowerCase() === 'agent') return true; const su = sessionUserIdStr(s); if (vu && su === vu) return false; return true; @@ -2163,6 +2168,24 @@ function AgentDetailInner() { const resolvedAvatarText = avatarText || (resolvedSenderLabel ? resolvedSenderLabel[0] : (isLeft ? 'A' : 'U')); const showSenderLabel = !!resolvedSenderLabel && (forceSenderLabel || !!msg.sender_name); + // Parse [image_data:data:image/...;base64,...] markers from user message content. + // The backend persists these markers in the DB to preserve multimodal context + // across turns. They must ALWAYS be stripped from displayContent so users never + // see raw base64 strings in the chat bubble. + // Guard: only collect extracted images for thumbnail rendering when msg.imageUrl + // is NOT already set — otherwise the image is already shown via the isImage path + // and rendering again from the marker would display it twice. + const IMAGE_DATA_RE = /\[image_data:(data:image\/[^;]+;base64,[^\]]+)\]/g; + const inlineImages: string[] = []; + let displayContent = msg.content || ''; + if (displayContent.includes('[image_data:')) { + displayContent = displayContent.replace(IMAGE_DATA_RE, (_: string, dataUrl: string) => { + // Only collect for thumbnail rendering if not already shown via imageUrl + if (!msg.imageUrl) inlineImages.push(dataUrl); + return ''; // always strip the marker from displayed text + }).trim(); + } + const timestampHtml = msg.timestamp ? (() => { const d = new Date(msg.timestamp); const now = new Date(); @@ -2195,9 +2218,23 @@ function AgentDetailInner() { {msg.fileName}
))} + {/* Render images extracted from [image_data:] markers (multimodal context) */} + {inlineImages.length > 0 && ( +
+ {inlineImages.map((url, idx) => ( + attached image + ))} +
+ )} {msg.thinking && (
- 💭 Thinking + Thinking
{msg.thinking}
)} @@ -2207,8 +2244,8 @@ function AgentDetailInner() {
{t('agent.chat.thinking', 'Thinking...')}
- ) : - ) :
{msg.content}
} + ) : + ) :
{displayContent}
} {timestampHtml}
@@ -2444,6 +2481,44 @@ function AgentDetailInner() { await Promise.all(allowedFiles.map((file, i) => runOne(file, newDrafts[i]))); }; + // ── Drag-and-drop chat file upload ── + const handleDroppedChatFiles = useCallback(async (files: File[]) => { + if (!wsConnected || chatUploadDrafts.length > 0 || isWaiting || isStreaming || attachedFiles.length >= 10) return; + const availableSlots = Math.max(0, 10 - attachedFiles.length); + const filesToProcess = files.slice(0, availableSlots); + + for (const file of filesToProcess) { + const draftId = Math.random().toString(36).slice(2, 9); + const previewUrl = file.type.startsWith('image/') ? URL.createObjectURL(file) : undefined; + setChatUploadDrafts(prev => [...prev, { id: draftId, name: file.name, percent: 0, previewUrl, sizeBytes: file.size }]); + + try { + const { promise } = uploadFileWithProgress( + '/chat/upload', + file, + (pct) => { + setChatUploadDrafts(prev => prev.map(d => d.id === draftId ? { ...d, percent: pct >= 101 ? 100 : pct } : d)); + }, + id ? { agent_id: id } : undefined, + ); + const data = await promise; + setAttachedFiles(prev => [...prev, { name: data.filename, text: data.extracted_text, path: data.workspace_path, imageUrl: data.image_data_url || undefined }]); + } catch (err: any) { + if (err?.message !== 'Upload cancelled') { + alert(err?.message || t('agent.upload.failed')); + } + } finally { + if (previewUrl) URL.revokeObjectURL(previewUrl); + setChatUploadDrafts(prev => prev.filter(d => d.id !== draftId)); + } + } + }, [id, wsConnected, chatUploadDrafts.length, isWaiting, isStreaming, attachedFiles.length, isWritableSession, t]); + + const { isDragging: isChatDragging, dropZoneProps: chatDropProps } = useDropZone({ + onDrop: handleDroppedChatFiles, + disabled: !wsConnected || chatUploadDrafts.length > 0 || isWaiting || isStreaming || attachedFiles.length >= 10 || !activeSession || !isWritableSession(activeSession), + }); + // Expandable activity log const [expandedLogId, setExpandedLogId] = useState(null); const [logFilter, setLogFilter] = useState('user'); // 'user' | 'backend' | 'heartbeat' | 'schedule' | 'messages' @@ -4052,55 +4127,29 @@ function AgentDetailInner() { )} {!canViewAllAgentChatSessions && ( -
+
)}
{canViewAllAgentChatSessions && ( -
+
{ if (e.key === 'Enter' || e.key === ' ') { @@ -4115,7 +4164,7 @@ function AgentDetailInner() { role="tab" tabIndex={0} aria-selected={chatScope === 'all'} - className={`tab ${chatScope === 'all' ? 'active' : ''}`} + className={`segment-item ${chatScope === 'all' ? 'active' : ''}`} onClick={onAdminTabOthers} onKeyDown={(e) => { if (e.key === 'Enter' || e.key === ' ') { @@ -4137,40 +4186,13 @@ function AgentDetailInner() {
)} @@ -4192,24 +4214,25 @@ function AgentDetailInner() { return (
{ setChatScope('mine'); selectSession(s, 'mine'); }} className="session-item" - style={{ padding: '8px 12px', cursor: 'pointer', borderLeft: isActive ? '2px solid var(--accent-primary)' : '2px solid transparent', background: isActive ? 'var(--bg-secondary)' : 'transparent', marginBottom: '1px', position: 'relative' }} - onMouseEnter={e => { if (!isActive) e.currentTarget.style.background = 'var(--bg-secondary)'; const btn = e.currentTarget.querySelector('.del-btn') as HTMLElement; if (btn) btn.style.opacity = '0.5'; }} - onMouseLeave={e => { if (!isActive) e.currentTarget.style.background = 'transparent'; const btn = e.currentTarget.querySelector('.del-btn') as HTMLElement; if (btn) btn.style.opacity = '0'; }}> -
-
{s.title}
- {chLabel && {chLabel}} -
-
- {s.last_message_at - ? new Date(s.last_message_at).toLocaleString(i18n.language === 'zh' ? 'zh-CN' : 'en-US', { month: 'short', day: 'numeric', hour: '2-digit', minute: '2-digit' }) - : new Date(s.created_at).toLocaleString(i18n.language === 'zh' ? 'zh-CN' : 'en-US', { month: 'short', day: 'numeric' })} - {s.message_count > 0 && {s.message_count}} + style={{ padding: '8px 12px', cursor: 'pointer', borderLeft: isActive ? '2px solid var(--accent-primary)' : '2px solid transparent', background: isActive ? 'var(--bg-secondary)' : 'transparent', marginBottom: '1px', display: 'flex', alignItems: 'center', gap: '4px' }} + onMouseEnter={e => { if (!isActive) e.currentTarget.style.background = 'var(--bg-secondary)'; }} + onMouseLeave={e => { if (!isActive) e.currentTarget.style.background = 'transparent'; }}> +
+
+
{s.title}
+ {chLabel && {chLabel}} +
+
+ {s.last_message_at + ? new Date(s.last_message_at).toLocaleString(i18n.language === 'zh' ? 'zh-CN' : 'en-US', { month: 'short', day: 'numeric', hour: '2-digit', minute: '2-digit' }) + : new Date(s.created_at).toLocaleString(i18n.language === 'zh' ? 'zh-CN' : 'en-US', { month: 'short', day: 'numeric' })} + {s.message_count > 0 && {s.message_count}} +
- +
); })} @@ -4221,7 +4244,7 @@ function AgentDetailInner() { { + const wantEnable = e.target.checked; + if (wantEnable) { + const confirmed = window.confirm( + t('enterprise.a2aAsync.enableWarning', + [ + '⚠️ You are about to enable the A2A Async Communication feature (Beta).', + '', + 'This feature allows agents to communicate asynchronously via notify and task_delegate modes.', + '', + 'Known potential issues:', + '• Agent replies may contain internal technical terms (trigger names, focus items, etc.)', + '• task_delegate callbacks may occasionally be delayed or dropped due to rate limiting', + '• Token consumption will increase because each async message triggers a separate agent session', + '• Agent loops may occur if triggers are not properly configured', + '', + 'If you encounter any issues, please return to this page and disable the toggle to restore stable synchronous behavior.', + '', + 'Are you sure you want to enable this feature?' + ].join('\n') + ) + ); + if (!confirmed) return; + } + try { + await fetchJson(`/tenants/${selectedTenantId}`, { + method: 'PUT', + body: JSON.stringify({ a2a_async_enabled: wantEnable }), + }); + qc.invalidateQueries({ queryKey: ['tenant', selectedTenantId] }); + } catch (err: any) { + alert(err.message || 'Update failed'); + } + }} + style={{ opacity: 0, width: 0, height: 0 }} + /> + + + + +
+ + {currentTenant?.a2a_async_enabled + ? t('enterprise.a2aAsync.enabled', 'Enabled') + : t('enterprise.a2aAsync.disabled', 'Disabled') + } + +

+ {currentTenant?.a2a_async_enabled + ? t('enterprise.a2aAsync.enabledHint', 'Agents can use notify, task_delegate, and consult modes.') + : t('enterprise.a2aAsync.disabledHint', 'All agent messages use synchronous consult mode.') + } +

+
+
+
+ {/* ── Danger Zone: Delete Company ── */}

{t('enterprise.dangerZone', 'Danger Zone')}

diff --git a/frontend/src/pages/Layout.tsx b/frontend/src/pages/Layout.tsx index 5e0be2860..c47b3c4eb 100644 --- a/frontend/src/pages/Layout.tsx +++ b/frontend/src/pages/Layout.tsx @@ -1,6 +1,6 @@ import { useState, useEffect, useLayoutEffect, useRef, useCallback } from 'react'; import { createPortal } from 'react-dom'; -import { Outlet, NavLink, useNavigate } from 'react-router-dom'; +import { Outlet, NavLink, useNavigate, useMatch } from 'react-router-dom'; import { useTranslation } from 'react-i18next'; import { useQuery, useQueryClient } from '@tanstack/react-query'; import { useAuthStore } from '../stores'; @@ -237,6 +237,9 @@ export default function Layout() { const { user, logout, setAuth } = useAuthStore(); const queryClient = useQueryClient(); const isChinese = i18n.language?.startsWith('zh'); + // Detect chat page: needs fixed-height main-content for inner scroll to work + const isChatPage = !!useMatch('/agents/:id/chat'); + const [showAccountSettings, setShowAccountSettings] = useState(false); const [showAccountMenu, setShowAccountMenu] = useState(false); const [showLanguageSubmenu, setShowLanguageSubmenu] = useState(false); @@ -1020,7 +1023,7 @@ export default function Layout() {
)} -
+
diff --git a/helm/clawith/Chart.yaml b/helm/clawith/Chart.yaml index c1010e469..0dde0c93e 100644 --- a/helm/clawith/Chart.yaml +++ b/helm/clawith/Chart.yaml @@ -3,7 +3,7 @@ name: clawith description: A Helm chart for Clawith application deployment type: application version: 1.0.0 -appVersion: "1.8.2" +appVersion: "1.8.3-beta" keywords: - clawith - agent diff --git a/restart.sh b/restart.sh index 12f6d4316..07db0dd2b 100755 --- a/restart.sh +++ b/restart.sh @@ -178,6 +178,10 @@ start_backend() { echo -e "${YELLOW}🚀 Starting backend...${NC}" cd "$BACKEND_DIR" + # Auto-run schema migrations via alembic + echo -e "${YELLOW}🔄 Running schema migrations...${NC}" + .venv/bin/alembic upgrade head 2>/dev/null || true + # Auto-run data migrations (idempotent) echo -e "${YELLOW}🔄 Running data migrations...${NC}" .venv/bin/python -m app.scripts.migrate_schedules_to_triggers || true