diff --git a/backend/app/api/dingtalk.py b/backend/app/api/dingtalk.py index 9a431ecf2..de5ea7949 100644 --- a/backend/app/api/dingtalk.py +++ b/backend/app/api/dingtalk.py @@ -20,6 +20,54 @@ router = APIRouter(tags=["dingtalk"]) +# ─── DingTalk Corp API helpers ────────────────────────── + +async def _get_dingtalk_user_detail( + app_key: str, + app_secret: str, + staff_id: str, +) -> dict | None: + """Query DingTalk user detail via corp API to get unionId/mobile/email. + + Uses /topapi/v2/user/get, requires contact.user.read permission. + Returns None on failure (graceful degradation). + """ + import httpx + from app.services.dingtalk_token import dingtalk_token_manager + + try: + access_token = await dingtalk_token_manager.get_corp_token(app_key, app_secret) + if not access_token: + return None + + async with httpx.AsyncClient(timeout=10) as client: + user_resp = await client.post( + "https://oapi.dingtalk.com/topapi/v2/user/get", + params={"access_token": access_token}, + json={"userid": staff_id, "language": "zh_CN"}, + ) + user_data = user_resp.json() + + if user_data.get("errcode") != 0: + logger.warning( + f"[DingTalk] /topapi/v2/user/get failed for {staff_id}: " + f"errcode={user_data.get('errcode')} errmsg={user_data.get('errmsg')}" + ) + return None + + result = user_data.get("result", {}) + return { + "unionid": result.get("unionid", ""), + "mobile": result.get("mobile", ""), + "email": result.get("email", "") or result.get("org_email", ""), + "name": result.get("name", ""), + } + + except Exception as e: + logger.warning(f"[DingTalk] _get_dingtalk_user_detail error for {staff_id}: {e}") + return None + + # ─── Config CRUD ──────────────────────────────────────── @router.post("/agents/{agent_id}/dingtalk-channel", response_model=ChannelConfigOut, status_code=201) @@ -145,6 +193,8 @@ async def process_dingtalk_message( conversation_id: str, conversation_type: str, session_webhook: str, + sender_nick: str = "", + sender_id: str = "", ): """Process an incoming DingTalk bot message and reply via session webhook.""" import json @@ -177,16 +227,65 @@ async def process_dingtalk_message( # P2P / single chat conv_id = f"dingtalk_p2p_{sender_staff_id}" - # Resolve channel user via unified service (uses OrgMember + SSO patterns) + # Build extra_info for user resolution using senderStaffId as primary key. + # Try to enrich with corp API data (unionid/mobile/email) for better + # cross-channel matching when the basic OrgMember lookup fails. + extra_info: dict = {"name": sender_nick} if sender_nick else {} + + # Load channel config for corp API calls + _cfg_r = await db.execute( + _select(ChannelConfig).where( + ChannelConfig.agent_id == agent_id, + ChannelConfig.channel_type == "dingtalk", + ) + ) + _cfg = _cfg_r.scalar_one_or_none() + + if _cfg and _cfg.app_id and _cfg.app_secret: + dt_detail = await _get_dingtalk_user_detail( + _cfg.app_id, _cfg.app_secret, sender_staff_id + ) + if dt_detail: + if dt_detail.get("unionid"): + extra_info["unionid"] = dt_detail["unionid"] + if dt_detail.get("mobile"): + extra_info["mobile"] = dt_detail["mobile"] + if dt_detail.get("email"): + extra_info["email"] = dt_detail["email"] + if dt_detail.get("name") and not sender_nick: + extra_info["name"] = dt_detail["name"] + logger.debug( + f"[DingTalk] Enriched user detail for {sender_staff_id}: " + f"unionid={dt_detail.get('unionid', '')[:8]}..." + ) + + # Resolve channel user via unified service (uses OrgMember + SSO patterns). + # senderStaffId is the stable enterprise userId — used as external_user_id. platform_user = await channel_user_service.resolve_channel_user( db=db, agent=agent_obj, channel_type="dingtalk", external_user_id=sender_staff_id, - extra_info={"unionid": sender_staff_id}, + extra_info=extra_info, ) platform_user_id = platform_user.id + # Check for channel commands (/new, /reset) + from app.services.channel_commands import is_channel_command, handle_channel_command + if is_channel_command(user_text): + cmd_result = await handle_channel_command( + db=db, command=user_text, agent_id=agent_id, + user_id=platform_user_id, external_conv_id=conv_id, + source_channel="dingtalk", + ) + await db.commit() + async with httpx.AsyncClient(timeout=10) as _cl_cmd: + await _cl_cmd.post(session_webhook, json={ + "msgtype": "text", + "text": {"content": cmd_result["message"]}, + }) + return + # Find or create session sess = await find_or_create_channel_session( db=db, diff --git a/backend/app/api/feishu.py b/backend/app/api/feishu.py index aff83664a..6235191e8 100644 --- a/backend/app/api/feishu.py +++ b/backend/app/api/feishu.py @@ -446,6 +446,23 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict, db: AsyncSession from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE ctx_size = (agent_obj.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE) if agent_obj else DEFAULT_CONTEXT_WINDOW_SIZE + # Check for channel commands (/new, /reset) + from app.services.channel_commands import is_channel_command, handle_channel_command + if is_channel_command(user_text): + _cmd_result = await handle_channel_command( + db=db, command=user_text, agent_id=agent_id, + user_id=creator_id, external_conv_id=conv_id, + source_channel="feishu", + ) + await db.commit() + import json as _j_cmd + _cmd_reply = _j_cmd.dumps({"text": _cmd_result["message"]}) + if chat_type == "group" and chat_id: + await feishu_service.send_message(config.app_id, config.app_secret, chat_id, "text", _cmd_reply, receive_id_type="chat_id") + else: + await feishu_service.send_message(config.app_id, config.app_secret, sender_open_id, "text", _cmd_reply) + return {"code": 0, "msg": "command handled"} + # Pre-resolve session so history lookup uses the UUID (session created later if new) _pre_sess_r = await db.execute( select(__import__('app.models.chat_session', fromlist=['ChatSession']).ChatSession).where( @@ -1497,11 +1514,11 @@ async def _call_agent_llm( return f"⚠️ {agent.name} 未配置 LLM 模型,请在管理后台设置。" # Build conversation messages (without system prompt — call_llm adds it) + # NOTE: history is already truncated to ctx_size by the SQL .limit() in + # each caller, so no further slicing is needed here. messages: list[dict] = [] - from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE - ctx_size = agent.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE if history: - messages.extend(history[-ctx_size:]) + messages.extend(history) messages.append({"role": "user", "content": user_text}) # Use actual user_id so the system prompt knows who it's chatting with diff --git a/backend/app/services/channel_commands.py b/backend/app/services/channel_commands.py new file mode 100644 index 000000000..cd7bd9ccd --- /dev/null +++ b/backend/app/services/channel_commands.py @@ -0,0 +1,75 @@ +"""Channel command handler for external channels (DingTalk, Feishu, etc.) + +Supports slash commands like /new to reset session context. +""" + +import uuid +from datetime import datetime, timezone +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.chat_session import ChatSession +from app.services.channel_session import find_or_create_channel_session + + +COMMANDS = {"/new", "/reset"} + + +def is_channel_command(text: str) -> bool: + """Check if the message is a recognized channel command.""" + stripped = text.strip().lower() + return stripped in COMMANDS + + +async def handle_channel_command( + db: AsyncSession, + command: str, + agent_id: uuid.UUID, + user_id: uuid.UUID, + external_conv_id: str, + source_channel: str, +) -> dict: + """Handle a channel command and return response info. + + Returns: + {"action": "new_session", "message": "..."} + """ + cmd = command.strip().lower() + + if cmd in ("/new", "/reset"): + # Find current session + result = await db.execute( + select(ChatSession).where( + ChatSession.agent_id == agent_id, + ChatSession.external_conv_id == external_conv_id, + ) + ) + old_session = result.scalar_one_or_none() + + if old_session: + # Rename old external_conv_id so find_or_create will make a new one + now = datetime.now(timezone.utc) + old_session.external_conv_id = ( + f"{external_conv_id}__archived_{now.strftime('%Y%m%d_%H%M%S')}" + ) + await db.flush() + + # Create new session + new_session = ChatSession( + agent_id=agent_id, + user_id=user_id, + title="New Session", + source_channel=source_channel, + external_conv_id=external_conv_id, + created_at=datetime.now(timezone.utc), + ) + db.add(new_session) + await db.flush() + + return { + "action": "new_session", + "session_id": str(new_session.id), + "message": "已开启新对话,之前的上下文已清除。", + } + + return {"action": "unknown", "message": f"未知命令: {cmd}"} diff --git a/backend/app/services/dingtalk_service.py b/backend/app/services/dingtalk_service.py index 010fc56a4..b07bc4fde 100644 --- a/backend/app/services/dingtalk_service.py +++ b/backend/app/services/dingtalk_service.py @@ -6,29 +6,12 @@ async def get_dingtalk_access_token(app_id: str, app_secret: str) -> dict: - """Get DingTalk access_token using app_id and app_secret. - - API: https://open.dingtalk.com/document/orgapp/obtain-access_token - """ - url = "https://oapi.dingtalk.com/gettoken" - params = { - "appkey": app_id, - "appsecret": app_secret, - } - - async with httpx.AsyncClient(timeout=10) as client: - try: - resp = await client.get(url, params=params) - data = resp.json() - - if data.get("errcode") == 0: - return {"access_token": data.get("access_token"), "expires_in": data.get("expires_in")} - else: - logger.error(f"[DingTalk] Failed to get access_token: {data}") - return {"errcode": data.get("errcode"), "errmsg": data.get("errmsg")} - except Exception as e: - logger.error(f"[DingTalk] Network error getting access_token: {e}") - return {"errcode": -1, "errmsg": str(e)} + """Get DingTalk access_token using centralized token manager with caching.""" + from app.services.dingtalk_token import dingtalk_token_manager + token = await dingtalk_token_manager.get_token(app_id, app_secret) + if token: + return {"access_token": token, "expires_in": 7200} + return {"access_token": None, "expires_in": 0} async def send_dingtalk_v1_robot_oto_message( diff --git a/backend/app/services/dingtalk_stream.py b/backend/app/services/dingtalk_stream.py index 28a8ba8e2..e2739b68c 100644 --- a/backend/app/services/dingtalk_stream.py +++ b/backend/app/services/dingtalk_stream.py @@ -94,13 +94,17 @@ async def process(self, callback: dingtalk_stream.CallbackMessage): if not user_text: return dingtalk_stream.AckMessage.STATUS_OK, "empty message" - sender_staff_id = incoming.sender_staff_id or incoming.sender_id or "" + sender_staff_id = incoming.sender_staff_id or "" + sender_id = incoming.sender_id or "" + if not sender_staff_id and sender_id: + sender_staff_id = sender_id # fallback + sender_nick = incoming.sender_nick or "" conversation_id = incoming.conversation_id or "" conversation_type = incoming.conversation_type or "1" session_webhook = incoming.session_webhook or "" logger.info( - f"[DingTalk Stream] Message from [{incoming.sender_nick}]{sender_staff_id}: {user_text[:80]}" + f"[DingTalk Stream] Message from [{sender_nick}]{sender_staff_id}: {user_text[:80]}" ) # Dispatch to the main FastAPI event loop for DB + LLM processing @@ -115,6 +119,8 @@ async def process(self, callback: dingtalk_stream.CallbackMessage): conversation_id=conversation_id, conversation_type=conversation_type, session_webhook=session_webhook, + sender_nick=sender_nick, + sender_id=sender_id, ), main_loop, ) diff --git a/backend/app/services/dingtalk_token.py b/backend/app/services/dingtalk_token.py new file mode 100644 index 000000000..09b516822 --- /dev/null +++ b/backend/app/services/dingtalk_token.py @@ -0,0 +1,77 @@ +"""DingTalk access_token global cache manager. + +Caches tokens per app_key with auto-refresh before expiry. +All DingTalk token acquisition should go through this manager. +""" + +import time +import asyncio +from typing import Dict, Optional, Tuple +from loguru import logger +import httpx + + +class DingTalkTokenManager: + """Global DingTalk access_token cache. + + - Cache by app_key + - Token valid for 7200s, refresh 300s early + - Concurrency-safe with asyncio.Lock + """ + + def __init__(self): + self._cache: Dict[str, Tuple[str, float]] = {} + self._locks: Dict[str, asyncio.Lock] = {} + + def _get_lock(self, app_key: str) -> asyncio.Lock: + if app_key not in self._locks: + self._locks[app_key] = asyncio.Lock() + return self._locks[app_key] + + async def get_token(self, app_key: str, app_secret: str) -> Optional[str]: + """Get access_token, return cached if valid, refresh if expired.""" + if app_key in self._cache: + token, expires_at = self._cache[app_key] + if time.time() < expires_at - 300: + return token + + async with self._get_lock(app_key): + # Double-check after acquiring lock + if app_key in self._cache: + token, expires_at = self._cache[app_key] + if time.time() < expires_at - 300: + return token + + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post( + "https://api.dingtalk.com/v1.0/oauth2/accessToken", + json={"appKey": app_key, "appSecret": app_secret}, + ) + data = resp.json() + token = data.get("accessToken") + expires_in = data.get("expireIn", 7200) + + if token: + self._cache[app_key] = (token, time.time() + expires_in) + logger.debug(f"[DingTalk Token] Refreshed for {app_key[:8]}..., expires in {expires_in}s") + return token + + logger.error(f"[DingTalk Token] Failed to get token: {data}") + return None + except Exception as e: + logger.error(f"[DingTalk Token] Error getting token: {e}") + return None + + async def get_corp_token(self, app_key: str, app_secret: str) -> Optional[str]: + """Get corp access_token via oapi.dingtalk.com/gettoken (GET). + + Used for corp API calls like /topapi/v2/user/get. + Shares the same cache since the token works for both APIs. + """ + # The v1.0 OAuth2 token also works for corp APIs, so reuse it + return await self.get_token(app_key, app_secret) + + +# Global singleton +dingtalk_token_manager = DingTalkTokenManager()