diff --git a/backend/app/api/dingtalk.py b/backend/app/api/dingtalk.py index 9a431ecf2..1c93c5545 100644 --- a/backend/app/api/dingtalk.py +++ b/backend/app/api/dingtalk.py @@ -57,7 +57,7 @@ async def configure_dingtalk_channel( existing.is_configured = True existing.extra_config = {**existing.extra_config, "connection_mode": conn_mode, "agent_id": dingtalk_agent_id} await db.flush() - + # Restart Stream client if in websocket mode if conn_mode == "websocket": from app.services.dingtalk_stream import dingtalk_stream_manager @@ -68,7 +68,7 @@ async def configure_dingtalk_channel( from app.services.dingtalk_stream import dingtalk_stream_manager import asyncio asyncio.create_task(dingtalk_stream_manager.stop_client(agent_id)) - + return ChannelConfigOut.model_validate(existing) config = ChannelConfig( @@ -145,8 +145,19 @@ async def process_dingtalk_message( conversation_id: str, conversation_type: str, session_webhook: str, + image_base64_list: list[str] | None = None, + saved_file_paths: list[str] | None = None, + sender_nick: str = "", + message_id: str = "", ): - """Process an incoming DingTalk bot message and reply via session webhook.""" + """Process an incoming DingTalk bot message and reply via session webhook. + + Args: + image_base64_list: List of base64-encoded image data URIs for vision LLM. + saved_file_paths: List of local file paths where media files were saved. + sender_nick: Display name of the sender from DingTalk. + message_id: DingTalk message ID (used for reactions). + """ import json import httpx from datetime import datetime, timezone @@ -207,21 +218,150 @@ async def process_dingtalk_message( ) history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())] + # Re-hydrate historical images for multi-turn LLM context + from app.services.image_context import rehydrate_image_messages + history = rehydrate_image_messages(history, agent_id, max_images=3) + + # Build saved_content for DB (no base64 blobs, keep it display-friendly) + import re as _re_dt + _clean_text = _re_dt.sub( + r'\[image_data:data:image/[^;]+;base64,[A-Za-z0-9+/=]+\]', + "", user_text, + ).strip() + if saved_file_paths: + from pathlib import Path as _PathDT + _file_prefixes = "\n".join( + f"[file:{_PathDT(p).name}]" for p in saved_file_paths + ) + saved_content = f"{_file_prefixes}\n{_clean_text}".strip() if _clean_text else _file_prefixes + else: + saved_content = _clean_text or user_text + # Save user message db.add(ChatMessage( agent_id=agent_id, user_id=platform_user_id, - role="user", content=user_text, + role="user", content=saved_content, conversation_id=session_conv_id, )) sess.last_message_at = datetime.now(timezone.utc) await db.commit() + # Build LLM input text: for images, inject base64 markers so vision models can see them + llm_user_text = user_text + if image_base64_list: + image_markers = "\n".join( + f"[image_data:{uri}]" for uri in image_base64_list + ) + llm_user_text = f"{user_text}\n{image_markers}" if user_text else image_markers + + # ── Set up channel_file_sender so the agent can send files via DingTalk ── + from app.services.agent_tools import channel_file_sender as _cfs + from app.services.dingtalk_stream import ( + _upload_dingtalk_media, + _send_dingtalk_media_message, + ) + + # Load DingTalk credentials from ChannelConfig + _dt_cfg_r = await db.execute( + _select(ChannelConfig).where( + ChannelConfig.agent_id == agent_id, + ChannelConfig.channel_type == "dingtalk", + ) + ) + _dt_cfg = _dt_cfg_r.scalar_one_or_none() + _dt_app_key = _dt_cfg.app_id if _dt_cfg else None + _dt_app_secret = _dt_cfg.app_secret if _dt_cfg else None + + _cfs_token = None + if _dt_app_key and _dt_app_secret: + # Determine send target: group -> conversation_id, P2P -> sender_staff_id + _dt_target_id = conversation_id if conversation_type == "2" else sender_staff_id + _dt_conv_type = conversation_type + + async def _dingtalk_file_sender(file_path: str, msg: str = ""): + """Send a file/image/video via DingTalk proactive message API.""" + from pathlib import Path as _P + + _fp = _P(file_path) + _ext = _fp.suffix.lower() + + # Determine media type from extension + if _ext in (".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"): + _media_type = "image" + elif _ext in (".mp4", ".mov", ".avi", ".mkv"): + _media_type = "video" + elif _ext in (".mp3", ".wav", ".ogg", ".amr", ".m4a"): + _media_type = "voice" + else: + _media_type = "file" + + # Upload media to DingTalk + _mid = await _upload_dingtalk_media( + _dt_app_key, _dt_app_secret, file_path, _media_type + ) + + if _mid: + # Send via proactive message API + _ok = await _send_dingtalk_media_message( + _dt_app_key, _dt_app_secret, + _dt_target_id, _mid, _media_type, + _dt_conv_type, filename=_fp.name, + ) + if _ok: + # Also send accompany text if provided + if msg: + try: + async with httpx.AsyncClient(timeout=10) as _cl: + await _cl.post(session_webhook, json={ + "msgtype": "text", + "text": {"content": msg}, + }) + except Exception: + pass + return + + # Fallback: send a text message with file info + _fallback_parts = [] + if msg: + _fallback_parts.append(msg) + _fallback_parts.append(f"[File: {_fp.name}]") + try: + async with httpx.AsyncClient(timeout=10) as _cl: + await _cl.post(session_webhook, json={ + "msgtype": "text", + "text": {"content": "\n\n".join(_fallback_parts)}, + }) + except Exception as _fb_err: + logger.error(f"[DingTalk] Fallback file text also failed: {_fb_err}") + + _cfs_token = _cfs.set(_dingtalk_file_sender) + # Call LLM - reply_text = await _call_agent_llm( - db, agent_id, user_text, - history=history, user_id=platform_user_id, + try: + reply_text = await _call_agent_llm( + db, agent_id, llm_user_text, + history=history, user_id=platform_user_id, + ) + finally: + # Reset ContextVar + if _cfs_token is not None: + _cfs.reset(_cfs_token) + # Recall thinking reaction (before sending reply) + if message_id and _dt_app_key: + try: + from app.services.dingtalk_reaction import recall_thinking_reaction + await recall_thinking_reaction( + _dt_app_key, _dt_app_secret, + message_id, conversation_id, + ) + except Exception as _recall_err: + logger.warning(f"[DingTalk] Failed to recall thinking reaction: {_recall_err}") + + has_media = bool(image_base64_list or saved_file_paths) + logger.info( + f"[DingTalk] LLM reply ({'media' if has_media else 'text'} input): " + f"{reply_text[:100]}" ) - logger.info(f"[DingTalk] LLM reply: {reply_text[:100]}") # Reply via session webhook (markdown) try: diff --git a/backend/app/services/dingtalk_reaction.py b/backend/app/services/dingtalk_reaction.py new file mode 100644 index 000000000..4899de78b --- /dev/null +++ b/backend/app/services/dingtalk_reaction.py @@ -0,0 +1,110 @@ +"""DingTalk emotion reaction service — "thinking" indicator on user messages.""" + +import asyncio +from loguru import logger +from app.services.dingtalk_token import dingtalk_token_manager + + +async def add_thinking_reaction( + app_key: str, + app_secret: str, + message_id: str, + conversation_id: str, +) -> bool: + """Add "🤔思考中" reaction to a user message. Fire-and-forget, never raises.""" + import httpx + + if not message_id or not conversation_id or not app_key: + return False + + try: + token = await dingtalk_token_manager.get_token(app_key, app_secret) + if not token: + logger.warning("[DingTalk Reaction] Failed to get access token") + return False + + async with httpx.AsyncClient(timeout=5) as client: + resp = await client.post( + "https://api.dingtalk.com/v1.0/robot/emotion/reply", + headers={ + "x-acs-dingtalk-access-token": token, + "Content-Type": "application/json", + }, + json={ + "robotCode": app_key, + "openMsgId": message_id, + "openConversationId": conversation_id, + "emotionType": 2, + "emotionName": "🤔思考中", + "textEmotion": { + "emotionId": "2659900", + "emotionName": "🤔思考中", + "text": "🤔思考中", + "backgroundId": "im_bg_1", + }, + }, + ) + if resp.status_code == 200: + logger.info(f"[DingTalk Reaction] Thinking reaction added for msg {message_id[:16]}") + return True + else: + logger.warning(f"[DingTalk Reaction] Add failed: {resp.status_code} {resp.text[:200]}") + return False + except Exception as e: + logger.warning(f"[DingTalk Reaction] Add thinking reaction error: {e}") + return False + + +async def recall_thinking_reaction( + app_key: str, + app_secret: str, + message_id: str, + conversation_id: str, +) -> None: + """Recall "🤔思考中" reaction with retry (0ms, 1500ms, 5000ms). Fire-and-forget.""" + import httpx + + if not message_id or not conversation_id or not app_key: + return + + delays = [0, 1.5, 5.0] + + for delay in delays: + if delay > 0: + await asyncio.sleep(delay) + + try: + token = await dingtalk_token_manager.get_token(app_key, app_secret) + if not token: + continue + + async with httpx.AsyncClient(timeout=5) as client: + resp = await client.post( + "https://api.dingtalk.com/v1.0/robot/emotion/recall", + headers={ + "x-acs-dingtalk-access-token": token, + "Content-Type": "application/json", + }, + json={ + "robotCode": app_key, + "openMsgId": message_id, + "openConversationId": conversation_id, + "emotionType": 2, + "emotionName": "🤔思考中", + "textEmotion": { + "emotionId": "2659900", + "emotionName": "🤔思考中", + "text": "🤔思考中", + "backgroundId": "im_bg_1", + }, + }, + ) + if resp.status_code == 200: + logger.info(f"[DingTalk Reaction] Thinking reaction recalled for msg {message_id[:16]}") + return + else: + logger.warning(f"[DingTalk Reaction] Recall attempt failed: {resp.status_code}") + except Exception as e: + logger.warning(f"[DingTalk Reaction] Recall error: {e}") + + logger.warning(f"[DingTalk Reaction] All recall attempts failed for msg {message_id[:16]}") diff --git a/backend/app/services/dingtalk_service.py b/backend/app/services/dingtalk_service.py index 010fc56a4..d2b70f28b 100644 --- a/backend/app/services/dingtalk_service.py +++ b/backend/app/services/dingtalk_service.py @@ -155,3 +155,20 @@ async def send_dingtalk_message( if not agent_id: agent_id = app_id return await send_dingtalk_corp_conversation(app_id, app_secret, user_id, msg_body, agent_id) + + +async def download_dingtalk_media( + app_id: str, app_secret: str, download_code: str +) -> bytes | None: + """Download a media file from DingTalk using a downloadCode. + + Convenience wrapper that delegates to the stream module's download helper. + Returns raw file bytes on success, or None on failure. + + Args: + app_id: DingTalk app key (robotCode). + app_secret: DingTalk app secret. + download_code: The downloadCode from the incoming message payload. + """ + from app.services.dingtalk_stream import download_dingtalk_media as _download + return await _download(app_id, app_secret, download_code) diff --git a/backend/app/services/dingtalk_stream.py b/backend/app/services/dingtalk_stream.py index 28a8ba8e2..73cf80b20 100644 --- a/backend/app/services/dingtalk_stream.py +++ b/backend/app/services/dingtalk_stream.py @@ -5,15 +5,401 @@ """ import asyncio +import base64 +import json import threading import uuid -from typing import Dict +from pathlib import Path +from typing import Dict, List, Optional, Tuple +import httpx from loguru import logger from sqlalchemy import select +from app.config import get_settings from app.database import async_session from app.models.channel_config import ChannelConfig +from app.services.dingtalk_token import dingtalk_token_manager + + +# ─── DingTalk Media Helpers ───────────────────────────── + + +async def _get_media_download_url( + access_token: str, download_code: str, robot_code: str +) -> Optional[str]: + """Get media file download URL from DingTalk API.""" + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post( + "https://api.dingtalk.com/v1.0/robot/messageFiles/download", + headers={"x-acs-dingtalk-access-token": access_token}, + json={"downloadCode": download_code, "robotCode": robot_code}, + ) + data = resp.json() + url = data.get("downloadUrl") + if url: + return url + logger.error(f"[DingTalk] Failed to get download URL: {data}") + return None + except Exception as e: + logger.error(f"[DingTalk] Error getting download URL: {e}") + return None + + +async def _download_file(url: str) -> Optional[bytes]: + """Download a file from a URL and return its bytes.""" + try: + async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client: + resp = await client.get(url) + resp.raise_for_status() + return resp.content + except Exception as e: + logger.error(f"[DingTalk] Error downloading file: {e}") + return None + + +async def download_dingtalk_media( + app_key: str, app_secret: str, download_code: str +) -> Optional[bytes]: + """Download a media file from DingTalk using downloadCode. + + Steps: get access_token -> get download URL -> download file bytes. + """ + access_token = await dingtalk_token_manager.get_token(app_key, app_secret) + if not access_token: + return None + + download_url = await _get_media_download_url(access_token, download_code, app_key) + if not download_url: + return None + + return await _download_file(download_url) + + +def _resolve_upload_dir(agent_id: uuid.UUID) -> Path: + """Get the uploads directory for an agent, creating it if needed.""" + settings = get_settings() + upload_dir = Path(settings.AGENT_DATA_DIR) / str(agent_id) / "workspace" / "uploads" + upload_dir.mkdir(parents=True, exist_ok=True) + return upload_dir + + +async def _process_media_message( + msg_data: dict, + app_key: str, + app_secret: str, + agent_id: uuid.UUID, +) -> Tuple[str, Optional[List[str]], Optional[List[str]]]: + """Process a DingTalk message and extract text + media info. + + Returns: + (user_text, image_base64_list, saved_file_paths) + - user_text: text content for the LLM (may include markers) + - image_base64_list: list of base64-encoded image data URIs, or None + - saved_file_paths: list of saved file paths, or None + """ + msgtype = msg_data.get("msgtype", "text") + logger.info(f"[DingTalk] Processing message type: {msgtype}") + + image_base64_list: List[str] = [] + saved_file_paths: List[str] = [] + + if msgtype == "text": + text_content = msg_data.get("text", {}).get("content", "").strip() + return text_content, None, None + + elif msgtype == "picture": + download_code = msg_data.get("content", {}).get("downloadCode", "") + if not download_code: + download_code = msg_data.get("downloadCode", "") + if not download_code: + logger.warning("[DingTalk] Picture message without downloadCode") + return "[User sent an image, but it could not be downloaded]", None, None + + file_bytes = await download_dingtalk_media(app_key, app_secret, download_code) + if not file_bytes: + return "[User sent an image, but download failed]", None, None + + upload_dir = _resolve_upload_dir(agent_id) + filename = f"dingtalk_img_{uuid.uuid4().hex[:8]}.jpg" + save_path = upload_dir / filename + save_path.write_bytes(file_bytes) + logger.info(f"[DingTalk] Saved image to {save_path} ({len(file_bytes)} bytes)") + + b64_data = base64.b64encode(file_bytes).decode("ascii") + image_marker = f"[image_data:data:image/jpeg;base64,{b64_data}]" + return ( + f"[User sent an image]\n{image_marker}", + [f"data:image/jpeg;base64,{b64_data}"], + [str(save_path)], + ) + + elif msgtype == "richText": + rich_text = msg_data.get("content", {}).get("richText", []) + text_parts: List[str] = [] + + for section in rich_text: + for item in section if isinstance(section, list) else [section]: + if "text" in item: + text_parts.append(item["text"]) + elif "downloadCode" in item: + file_bytes = await download_dingtalk_media( + app_key, app_secret, item["downloadCode"] + ) + if file_bytes: + upload_dir = _resolve_upload_dir(agent_id) + filename = f"dingtalk_richimg_{uuid.uuid4().hex[:8]}.jpg" + save_path = upload_dir / filename + save_path.write_bytes(file_bytes) + logger.info(f"[DingTalk] Saved rich text image to {save_path}") + + b64_data = base64.b64encode(file_bytes).decode("ascii") + image_marker = f"[image_data:data:image/jpeg;base64,{b64_data}]" + text_parts.append(image_marker) + image_base64_list.append(f"data:image/jpeg;base64,{b64_data}") + saved_file_paths.append(str(save_path)) + + combined_text = "\n".join(text_parts).strip() + if not combined_text: + combined_text = "[User sent a rich text message]" + + return ( + combined_text, + image_base64_list if image_base64_list else None, + saved_file_paths if saved_file_paths else None, + ) + + elif msgtype == "audio": + content = msg_data.get("content", {}) + recognition = content.get("recognition", "") + if recognition: + logger.info(f"[DingTalk] Audio with recognition: {recognition[:80]}") + return f"[Voice message] {recognition}", None, None + + download_code = content.get("downloadCode", "") + if download_code: + file_bytes = await download_dingtalk_media(app_key, app_secret, download_code) + if file_bytes: + upload_dir = _resolve_upload_dir(agent_id) + duration = content.get("duration", "unknown") + filename = f"dingtalk_audio_{uuid.uuid4().hex[:8]}.amr" + save_path = upload_dir / filename + save_path.write_bytes(file_bytes) + logger.info(f"[DingTalk] Saved audio to {save_path} ({len(file_bytes)} bytes)") + return ( + f"[User sent a voice message, duration {duration}ms, saved to {filename}]", + None, + [str(save_path)], + ) + return "[User sent a voice message, but it could not be processed]", None, None + + elif msgtype == "video": + content = msg_data.get("content", {}) + download_code = content.get("downloadCode", "") + if download_code: + file_bytes = await download_dingtalk_media(app_key, app_secret, download_code) + if file_bytes: + upload_dir = _resolve_upload_dir(agent_id) + duration = content.get("duration", "unknown") + filename = f"dingtalk_video_{uuid.uuid4().hex[:8]}.mp4" + save_path = upload_dir / filename + save_path.write_bytes(file_bytes) + logger.info(f"[DingTalk] Saved video to {save_path} ({len(file_bytes)} bytes)") + return ( + f"[User sent a video, duration {duration}ms, saved to {filename}]", + None, + [str(save_path)], + ) + return "[User sent a video, but it could not be downloaded]", None, None + + elif msgtype == "file": + content = msg_data.get("content", {}) + download_code = content.get("downloadCode", "") + original_filename = content.get("fileName", "unknown_file") + if download_code: + file_bytes = await download_dingtalk_media(app_key, app_secret, download_code) + if file_bytes: + upload_dir = _resolve_upload_dir(agent_id) + safe_name = f"dingtalk_{uuid.uuid4().hex[:8]}_{original_filename}" + save_path = upload_dir / safe_name + save_path.write_bytes(file_bytes) + logger.info( + f"[DingTalk] Saved file '{original_filename}' to {save_path} " + f"({len(file_bytes)} bytes)" + ) + return ( + f"[file:{original_filename}]", + None, + [str(save_path)], + ) + return f"[User sent file {original_filename}, but it could not be downloaded]", None, None + + else: + logger.warning(f"[DingTalk] Unsupported message type: {msgtype}") + return f"[User sent a {msgtype} message, which is not yet supported]", None, None + + +# ─── DingTalk Media Upload & Send ─────────────────────── + +async def _upload_dingtalk_media( + app_key: str, + app_secret: str, + file_path: str, + media_type: str = "file", +) -> Optional[str]: + """Upload a media file to DingTalk and return the mediaId. + + Args: + app_key: DingTalk app key (robotCode). + app_secret: DingTalk app secret. + file_path: Local file path to upload. + media_type: One of 'image', 'voice', 'video', 'file'. + + Returns: + mediaId string on success, None on failure. + """ + access_token = await dingtalk_token_manager.get_token(app_key, app_secret) + if not access_token: + return None + + file_p = Path(file_path) + if not file_p.exists(): + logger.error(f"[DingTalk] Upload failed: file not found: {file_path}") + return None + + try: + file_bytes = file_p.read_bytes() + async with httpx.AsyncClient(timeout=60) as client: + # Use the legacy oapi endpoint which is more reliable and widely supported. + upload_url = ( + f"https://oapi.dingtalk.com/media/upload" + f"?access_token={access_token}&type={media_type}" + ) + resp = await client.post( + upload_url, + files={"media": (file_p.name, file_bytes)}, + ) + data = resp.json() + # Legacy API returns media_id (snake_case), new API returns mediaId + media_id = data.get("media_id") or data.get("mediaId") + if media_id and data.get("errcode", 0) == 0: + logger.info( + f"[DingTalk] Uploaded {media_type} '{file_p.name}' -> mediaId={media_id[:20]}..." + ) + return media_id + logger.error(f"[DingTalk] Upload failed: {data}") + return None + except Exception as e: + logger.error(f"[DingTalk] Upload error: {e}") + return None + + +async def _send_dingtalk_media_message( + app_key: str, + app_secret: str, + target_id: str, + media_id: str, + media_type: str, + conversation_type: str, + filename: Optional[str] = None, +) -> bool: + """Send a media message via DingTalk proactive message API. + + Args: + app_key: DingTalk app key (robotCode). + app_secret: DingTalk app secret. + target_id: For P2P: sender_staff_id; For group: openConversationId. + media_id: The mediaId from upload. + media_type: One of 'image', 'voice', 'video', 'file'. + conversation_type: '1' for P2P, '2' for group. + filename: Original filename (used for file/video types). + + Returns: + True on success, False on failure. + """ + access_token = await dingtalk_token_manager.get_token(app_key, app_secret) + if not access_token: + return False + + headers = {"x-acs-dingtalk-access-token": access_token} + + # Build msgKey and msgParam based on media_type + if media_type == "image": + msg_key = "sampleImageMsg" + msg_param = json.dumps({"photoURL": media_id}) + elif media_type == "voice": + msg_key = "sampleAudio" + msg_param = json.dumps({"mediaId": media_id, "duration": "3000"}) + elif media_type == "video": + safe_name = filename or "video.mp4" + ext = Path(safe_name).suffix.lstrip(".") or "mp4" + msg_key = "sampleFile" + msg_param = json.dumps({ + "mediaId": media_id, + "fileName": safe_name, + "fileType": ext, + }) + else: + # file + safe_name = filename or "file" + ext = Path(safe_name).suffix.lstrip(".") or "bin" + msg_key = "sampleFile" + msg_param = json.dumps({ + "mediaId": media_id, + "fileName": safe_name, + "fileType": ext, + }) + + try: + async with httpx.AsyncClient(timeout=15) as client: + if conversation_type == "2": + # Group chat + resp = await client.post( + "https://api.dingtalk.com/v1.0/robot/groupMessages/send", + headers=headers, + json={ + "robotCode": app_key, + "openConversationId": target_id, + "msgKey": msg_key, + "msgParam": msg_param, + }, + ) + else: + # P2P chat + resp = await client.post( + "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend", + headers=headers, + json={ + "robotCode": app_key, + "userIds": [target_id], + "msgKey": msg_key, + "msgParam": msg_param, + }, + ) + + data = resp.json() + if resp.status_code >= 400 or data.get("errcode"): + logger.error(f"[DingTalk] Send media failed: {data}") + return False + + logger.info( + f"[DingTalk] Sent {media_type} message to {target_id[:16]}... " + f"(conv_type={conversation_type})" + ) + return True + except Exception as e: + logger.error(f"[DingTalk] Send media error: {e}") + return False + + +# ─── Stream Manager ───────────────────────────────────── + + +def _fire_and_forget(loop, coro): + """Schedule a coroutine on the main loop and log any unhandled exception.""" + future = asyncio.run_coroutine_threadsafe(coro, loop) + future.add_done_callback(lambda f: f.exception() if not f.cancelled() else None) class DingTalkStreamManager: @@ -67,47 +453,72 @@ def _run_client_thread( app_secret: str, stop_event: threading.Event, ): - """Run the DingTalk Stream client in a blocking thread.""" + """Run the DingTalk Stream client with auto-reconnect.""" try: import dingtalk_stream + except ImportError: + logger.warning( + "[DingTalk Stream] dingtalk-stream package not installed. " + "Install with: pip install dingtalk-stream" + ) + self._threads.pop(agent_id, None) + self._stop_events.pop(agent_id, None) + return - # Reference to manager's main loop for async dispatch - main_loop = self._main_loop - - class ClawithChatbotHandler(dingtalk_stream.ChatbotHandler): - """Custom handler that dispatches messages to the Clawith LLM pipeline.""" - - async def process(self, callback: dingtalk_stream.CallbackMessage): - """Handle incoming bot message from DingTalk Stream. - - NOTE: The SDK invokes this method in the thread's own asyncio loop, - so we must dispatch to the main FastAPI loop for DB + LLM work. - """ - try: - # Parse the raw data into a ChatbotMessage via class method - incoming = dingtalk_stream.ChatbotMessage.from_dict(callback.data) - - # Extract text content + MAX_RETRIES = 5 + RETRY_DELAYS = [2, 5, 15, 30, 60] # exponential backoff, seconds + + # Reference to manager's main loop for async dispatch + main_loop = self._main_loop + retries = 0 + manager_self = self + + class ClawithChatbotHandler(dingtalk_stream.ChatbotHandler): + """Custom handler that dispatches messages to the Clawith LLM pipeline.""" + + async def process(self, callback: dingtalk_stream.CallbackMessage): + """Handle incoming bot message from DingTalk Stream. + + NOTE: The SDK invokes this method in the thread's own asyncio loop, + so we must dispatch to the main FastAPI loop for DB + LLM work. + """ + try: + # Parse the raw data + incoming = dingtalk_stream.ChatbotMessage.from_dict(callback.data) + msg_data = callback.data if isinstance(callback.data, dict) else json.loads(callback.data) + + msgtype = msg_data.get("msgtype", "text") + sender_staff_id = incoming.sender_staff_id or incoming.sender_id or "" + sender_nick = incoming.sender_nick or "" + message_id = incoming.message_id or "" + conversation_id = incoming.conversation_id or "" + conversation_type = incoming.conversation_type or "1" + session_webhook = incoming.session_webhook or "" + + logger.info( + f"[DingTalk Stream] Received {msgtype} message from {sender_staff_id}" + ) + + if msgtype == "text": + # Plain text: use existing logic text_list = incoming.get_text_list() user_text = " ".join(text_list).strip() if text_list else "" - if not user_text: return dingtalk_stream.AckMessage.STATUS_OK, "empty message" - sender_staff_id = incoming.sender_staff_id or incoming.sender_id or "" - conversation_id = incoming.conversation_id or "" - conversation_type = incoming.conversation_type or "1" - session_webhook = incoming.session_webhook or "" - logger.info( - f"[DingTalk Stream] Message from [{incoming.sender_nick}]{sender_staff_id}: {user_text[:80]}" + f"[DingTalk Stream] Text from {sender_staff_id}: {user_text[:80]}" ) - # Dispatch to the main FastAPI event loop for DB + LLM processing from app.api.dingtalk import process_dingtalk_message if main_loop and main_loop.is_running(): - future = asyncio.run_coroutine_threadsafe( + # Add thinking reaction immediately + from app.services.dingtalk_reaction import add_thinking_reaction + _fire_and_forget(main_loop, + add_thinking_reaction(app_key, app_secret, message_id, conversation_id)) + + _fire_and_forget(main_loop, process_dingtalk_message( agent_id=agent_id, sender_staff_id=sender_staff_id, @@ -115,50 +526,134 @@ async def process(self, callback: dingtalk_stream.CallbackMessage): conversation_id=conversation_id, conversation_type=conversation_type, session_webhook=session_webhook, - ), - main_loop, - ) - # Wait for result (with timeout) - try: - future.result(timeout=120) - except Exception as e: - logger.error(f"[DingTalk Stream] LLM processing error: {e}") - import traceback - traceback.print_exc() + sender_nick=sender_nick, + message_id=message_id, + )) else: - logger.warning("[DingTalk Stream] Main loop not available for dispatch") - - return dingtalk_stream.AckMessage.STATUS_OK, "ok" - except Exception as e: - logger.error(f"[DingTalk Stream] Error in message handler: {e}") - import traceback - traceback.print_exc() - return dingtalk_stream.AckMessage.STATUS_SYSTEM_EXCEPTION, str(e) - - credential = dingtalk_stream.Credential(client_id=app_key, client_secret=app_secret) - client = dingtalk_stream.DingTalkStreamClient(credential=credential) - client.register_callback_handler( - dingtalk_stream.chatbot.ChatbotMessage.TOPIC, - ClawithChatbotHandler(), - ) + logger.warning("[DingTalk Stream] Main loop not available") - logger.info(f"[DingTalk Stream] Connecting for agent {agent_id}...") - # start_forever() blocks until disconnected - client.start_forever() + else: + # Non-text message: process media in the main loop + if main_loop and main_loop.is_running(): + # Add thinking reaction immediately + from app.services.dingtalk_reaction import add_thinking_reaction + _fire_and_forget(main_loop, + add_thinking_reaction(app_key, app_secret, message_id, conversation_id)) + + _fire_and_forget(main_loop, + manager_self._handle_media_and_dispatch( + msg_data=msg_data, + app_key=app_key, + app_secret=app_secret, + agent_id=agent_id, + sender_staff_id=sender_staff_id, + conversation_id=conversation_id, + conversation_type=conversation_type, + session_webhook=session_webhook, + sender_nick=sender_nick, + message_id=message_id, + )) + else: + logger.warning("[DingTalk Stream] Main loop not available") + + return dingtalk_stream.AckMessage.STATUS_OK, "ok" + except Exception as e: + logger.error(f"[DingTalk Stream] Error in message handler: {e}") + import traceback + traceback.print_exc() + return dingtalk_stream.AckMessage.STATUS_SYSTEM_EXCEPTION, str(e) + + while not stop_event.is_set() and retries <= MAX_RETRIES: + try: + credential = dingtalk_stream.Credential(client_id=app_key, client_secret=app_secret) + client = dingtalk_stream.DingTalkStreamClient(credential=credential) + client.register_callback_handler( + dingtalk_stream.chatbot.ChatbotMessage.TOPIC, + ClawithChatbotHandler(), + ) - except ImportError: - logger.warning( - "[DingTalk Stream] dingtalk-stream package not installed. " - "Install with: pip install dingtalk-stream" + logger.info( + f"[DingTalk Stream] Connecting for agent {agent_id}... " + f"(attempt {retries + 1}/{MAX_RETRIES + 1})" + ) + # start_forever() blocks until disconnected + client.start_forever() + + # start_forever returned: connection dropped + if stop_event.is_set(): + break # intentional stop, no retry + + # Reset retries on successful connection (ran for a while then disconnected) + retries = 0 + retries += 1 + logger.warning( + f"[DingTalk Stream] Connection lost for agent {agent_id}, will retry..." + ) + + except Exception as e: + retries += 1 + logger.error( + f"[DingTalk Stream] Connection error for {agent_id} " + f"(attempt {retries}/{MAX_RETRIES + 1}): {e}" + ) + + if retries > MAX_RETRIES: + logger.error( + f"[DingTalk Stream] Agent {agent_id} exhausted all {MAX_RETRIES} retries, giving up" + ) + break + + delay = RETRY_DELAYS[min(retries - 1, len(RETRY_DELAYS) - 1)] + logger.info( + f"[DingTalk Stream] Retrying in {delay}s for agent {agent_id}..." ) - except Exception as e: - logger.error(f"[DingTalk Stream] Client error for {agent_id}: {e}") - import traceback - traceback.print_exc() - finally: - self._threads.pop(agent_id, None) - self._stop_events.pop(agent_id, None) - logger.info(f"[DingTalk Stream] Client stopped for agent {agent_id}") + # Use stop_event.wait so we exit immediately if stopped + if stop_event.wait(timeout=delay): + break # stop was requested during wait + + self._threads.pop(agent_id, None) + self._stop_events.pop(agent_id, None) + logger.info(f"[DingTalk Stream] Client stopped for agent {agent_id}") + + @staticmethod + async def _handle_media_and_dispatch( + msg_data: dict, + app_key: str, + app_secret: str, + agent_id: uuid.UUID, + sender_staff_id: str, + conversation_id: str, + conversation_type: str, + session_webhook: str, + sender_nick: str = "", + message_id: str = "", + ): + """Download media, then dispatch to process_dingtalk_message.""" + from app.api.dingtalk import process_dingtalk_message + + user_text, image_base64_list, saved_file_paths = await _process_media_message( + msg_data=msg_data, + app_key=app_key, + app_secret=app_secret, + agent_id=agent_id, + ) + + if not user_text: + logger.info("[DingTalk Stream] Empty content after media processing, skipping") + return + + await process_dingtalk_message( + agent_id=agent_id, + sender_staff_id=sender_staff_id, + user_text=user_text, + conversation_id=conversation_id, + conversation_type=conversation_type, + session_webhook=session_webhook, + image_base64_list=image_base64_list, + saved_file_paths=saved_file_paths, + sender_nick=sender_nick, + message_id=message_id, + ) async def stop_client(self, agent_id: uuid.UUID): """Stop a running Stream client for an agent.""" @@ -167,7 +662,10 @@ async def stop_client(self, agent_id: uuid.UUID): stop_event.set() thread = self._threads.pop(agent_id, None) if thread and thread.is_alive(): - logger.info(f"[DingTalk Stream] Stopping client for agent {agent_id}") + logger.info(f"[DingTalk Stream] Stopping client for agent {agent_id}, waiting for thread...") + thread.join(timeout=5) + if thread.is_alive(): + logger.warning(f"[DingTalk Stream] Thread for {agent_id} did not exit within 5s") async def start_all(self): """Start Stream clients for all configured DingTalk agents.""" diff --git a/backend/app/services/dingtalk_token.py b/backend/app/services/dingtalk_token.py new file mode 100644 index 000000000..09b516822 --- /dev/null +++ b/backend/app/services/dingtalk_token.py @@ -0,0 +1,77 @@ +"""DingTalk access_token global cache manager. + +Caches tokens per app_key with auto-refresh before expiry. +All DingTalk token acquisition should go through this manager. +""" + +import time +import asyncio +from typing import Dict, Optional, Tuple +from loguru import logger +import httpx + + +class DingTalkTokenManager: + """Global DingTalk access_token cache. + + - Cache by app_key + - Token valid for 7200s, refresh 300s early + - Concurrency-safe with asyncio.Lock + """ + + def __init__(self): + self._cache: Dict[str, Tuple[str, float]] = {} + self._locks: Dict[str, asyncio.Lock] = {} + + def _get_lock(self, app_key: str) -> asyncio.Lock: + if app_key not in self._locks: + self._locks[app_key] = asyncio.Lock() + return self._locks[app_key] + + async def get_token(self, app_key: str, app_secret: str) -> Optional[str]: + """Get access_token, return cached if valid, refresh if expired.""" + if app_key in self._cache: + token, expires_at = self._cache[app_key] + if time.time() < expires_at - 300: + return token + + async with self._get_lock(app_key): + # Double-check after acquiring lock + if app_key in self._cache: + token, expires_at = self._cache[app_key] + if time.time() < expires_at - 300: + return token + + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post( + "https://api.dingtalk.com/v1.0/oauth2/accessToken", + json={"appKey": app_key, "appSecret": app_secret}, + ) + data = resp.json() + token = data.get("accessToken") + expires_in = data.get("expireIn", 7200) + + if token: + self._cache[app_key] = (token, time.time() + expires_in) + logger.debug(f"[DingTalk Token] Refreshed for {app_key[:8]}..., expires in {expires_in}s") + return token + + logger.error(f"[DingTalk Token] Failed to get token: {data}") + return None + except Exception as e: + logger.error(f"[DingTalk Token] Error getting token: {e}") + return None + + async def get_corp_token(self, app_key: str, app_secret: str) -> Optional[str]: + """Get corp access_token via oapi.dingtalk.com/gettoken (GET). + + Used for corp API calls like /topapi/v2/user/get. + Shares the same cache since the token works for both APIs. + """ + # The v1.0 OAuth2 token also works for corp APIs, so reuse it + return await self.get_token(app_key, app_secret) + + +# Global singleton +dingtalk_token_manager = DingTalkTokenManager() diff --git a/backend/app/services/image_context.py b/backend/app/services/image_context.py new file mode 100644 index 000000000..c4bf9eb64 --- /dev/null +++ b/backend/app/services/image_context.py @@ -0,0 +1,112 @@ +"""Re-hydrate image content from disk for LLM multi-turn context. + +Scans history messages for [file:xxx.jpg] patterns, +reads the image file from agent workspace, and injects base64 data +so the LLM can see images from previous turns. +""" + +import base64 +import re +from pathlib import Path +from typing import Optional + +from loguru import logger +from app.config import get_settings + +IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp'} +FILE_PATTERN = re.compile(r'\[file:([^\]]+)\]') +IMAGE_DATA_PATTERN = re.compile( + r'\[image_data:data:image/[^;]+;base64,[A-Za-z0-9+/=]+\]' +) +MAX_IMAGE_BYTES = 5 * 1024 * 1024 # 5MB per image + + +def rehydrate_image_messages( + messages: list[dict], + agent_id, + max_images: int = 3, +) -> list[dict]: + """Scan history for [file:xxx.jpg] and inject base64 image data for LLM. + + Only processes the most recent `max_images` user image messages + to limit context size and cost. + + Args: + messages: List of {"role": ..., "content": ...} dicts + agent_id: Agent UUID for resolving file paths + max_images: Max number of historical images to re-hydrate + + Returns: + New list with image messages enriched with base64 data. + Non-image messages and messages with existing image_data are unchanged. + """ + settings = get_settings() + upload_dir = ( + Path(settings.AGENT_DATA_DIR) / str(agent_id) / "workspace" / "uploads" + ) + + # Find user messages with [file:xxx.jpg] (newest first, skip current turn) + image_indices: list[tuple[int, str]] = [] # (index, filename) + for i in range(len(messages) - 1, -1, -1): + msg = messages[i] + if msg.get("role") != "user": + continue + content = msg.get("content", "") + if not isinstance(content, str): + continue + # Skip if already has image_data (current turn) + if "[image_data:" in content: + continue + match = FILE_PATTERN.search(content) + if not match: + continue + filename = match.group(1) + ext = Path(filename).suffix.lower() + if ext not in IMAGE_EXTENSIONS: + continue + image_indices.append((i, filename)) + if len(image_indices) >= max_images: + break + + if not image_indices: + return messages + + # Re-hydrate in-place (working on a copy) + result = list(messages) + rehydrated = 0 + + for idx, filename in image_indices: + file_path = upload_dir / filename + if not file_path.exists(): + logger.warning(f"[ImageContext] File not found: {file_path}") + continue + try: + img_bytes = file_path.read_bytes() + if len(img_bytes) > MAX_IMAGE_BYTES: + logger.info( + f"[ImageContext] Skipping large image: " + f"{filename} ({len(img_bytes)} bytes)" + ) + continue + + b64 = base64.b64encode(img_bytes).decode("ascii") + ext = file_path.suffix.lower().lstrip('.') + mime = f"image/{'jpeg' if ext == 'jpg' else ext}" + marker = f"[image_data:data:{mime};base64,{b64}]" + + # Append image_data marker to existing content + old_content = result[idx]["content"] + result[idx] = {**result[idx], "content": f"{old_content}\n{marker}"} + rehydrated += 1 + logger.debug(f"[ImageContext] Re-hydrated: {filename}") + + except Exception as e: + logger.error(f"[ImageContext] Failed to read {filename}: {e}") + + if rehydrated > 0: + logger.info( + f"[ImageContext] Re-hydrated {rehydrated} image(s) " + f"for agent {agent_id}" + ) + + return result