From ea1fe8acbc864a9e354ec6984f41b0e22d23eb6e Mon Sep 17 00:00:00 2001 From: "nap.liu" Date: Wed, 8 Apr 2026 20:18:39 +0800 Subject: [PATCH] feat: enhance DingTalk user matching with senderStaffId priority and centralized token management Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/api/dingtalk.py | 87 +++++++++++++++++++++++- backend/app/services/dingtalk_service.py | 29 ++------ backend/app/services/dingtalk_stream.py | 10 ++- backend/app/services/dingtalk_token.py | 77 +++++++++++++++++++++ 4 files changed, 176 insertions(+), 27 deletions(-) create mode 100644 backend/app/services/dingtalk_token.py diff --git a/backend/app/api/dingtalk.py b/backend/app/api/dingtalk.py index 9a431ecf2..02308e758 100644 --- a/backend/app/api/dingtalk.py +++ b/backend/app/api/dingtalk.py @@ -20,6 +20,54 @@ router = APIRouter(tags=["dingtalk"]) +# ─── DingTalk Corp API helpers ────────────────────────── + +async def _get_dingtalk_user_detail( + app_key: str, + app_secret: str, + staff_id: str, +) -> dict | None: + """Query DingTalk user detail via corp API to get unionId/mobile/email. + + Uses /topapi/v2/user/get, requires contact.user.read permission. + Returns None on failure (graceful degradation). + """ + import httpx + from app.services.dingtalk_token import dingtalk_token_manager + + try: + access_token = await dingtalk_token_manager.get_corp_token(app_key, app_secret) + if not access_token: + return None + + async with httpx.AsyncClient(timeout=10) as client: + user_resp = await client.post( + "https://oapi.dingtalk.com/topapi/v2/user/get", + params={"access_token": access_token}, + json={"userid": staff_id, "language": "zh_CN"}, + ) + user_data = user_resp.json() + + if user_data.get("errcode") != 0: + logger.warning( + f"[DingTalk] /topapi/v2/user/get failed for {staff_id}: " + f"errcode={user_data.get('errcode')} errmsg={user_data.get('errmsg')}" + ) + return None + + result = user_data.get("result", {}) + return { + "unionid": result.get("unionid", ""), + "mobile": result.get("mobile", ""), + "email": result.get("email", "") or result.get("org_email", ""), + "name": result.get("name", ""), + } + + except Exception as e: + logger.warning(f"[DingTalk] _get_dingtalk_user_detail error for {staff_id}: {e}") + return None + + # ─── Config CRUD ──────────────────────────────────────── @router.post("/agents/{agent_id}/dingtalk-channel", response_model=ChannelConfigOut, status_code=201) @@ -145,6 +193,8 @@ async def process_dingtalk_message( conversation_id: str, conversation_type: str, session_webhook: str, + sender_nick: str = "", + sender_id: str = "", ): """Process an incoming DingTalk bot message and reply via session webhook.""" import json @@ -177,13 +227,46 @@ async def process_dingtalk_message( # P2P / single chat conv_id = f"dingtalk_p2p_{sender_staff_id}" - # Resolve channel user via unified service (uses OrgMember + SSO patterns) + # Build extra_info for user resolution using senderStaffId as primary key. + # Try to enrich with corp API data (unionid/mobile/email) for better + # cross-channel matching when the basic OrgMember lookup fails. + extra_info: dict = {"name": sender_nick} if sender_nick else {} + + # Load channel config for corp API calls + _cfg_r = await db.execute( + _select(ChannelConfig).where( + ChannelConfig.agent_id == agent_id, + ChannelConfig.channel_type == "dingtalk", + ) + ) + _cfg = _cfg_r.scalar_one_or_none() + + if _cfg and _cfg.app_id and _cfg.app_secret: + dt_detail = await _get_dingtalk_user_detail( + _cfg.app_id, _cfg.app_secret, sender_staff_id + ) + if dt_detail: + if dt_detail.get("unionid"): + extra_info["unionid"] = dt_detail["unionid"] + if dt_detail.get("mobile"): + extra_info["mobile"] = dt_detail["mobile"] + if dt_detail.get("email"): + extra_info["email"] = dt_detail["email"] + if dt_detail.get("name") and not sender_nick: + extra_info["name"] = dt_detail["name"] + logger.debug( + f"[DingTalk] Enriched user detail for {sender_staff_id}: " + f"unionid={dt_detail.get('unionid', '')[:8]}..." + ) + + # Resolve channel user via unified service (uses OrgMember + SSO patterns). + # senderStaffId is the stable enterprise userId — used as external_user_id. platform_user = await channel_user_service.resolve_channel_user( db=db, agent=agent_obj, channel_type="dingtalk", external_user_id=sender_staff_id, - extra_info={"unionid": sender_staff_id}, + extra_info=extra_info, ) platform_user_id = platform_user.id diff --git a/backend/app/services/dingtalk_service.py b/backend/app/services/dingtalk_service.py index 010fc56a4..b07bc4fde 100644 --- a/backend/app/services/dingtalk_service.py +++ b/backend/app/services/dingtalk_service.py @@ -6,29 +6,12 @@ async def get_dingtalk_access_token(app_id: str, app_secret: str) -> dict: - """Get DingTalk access_token using app_id and app_secret. - - API: https://open.dingtalk.com/document/orgapp/obtain-access_token - """ - url = "https://oapi.dingtalk.com/gettoken" - params = { - "appkey": app_id, - "appsecret": app_secret, - } - - async with httpx.AsyncClient(timeout=10) as client: - try: - resp = await client.get(url, params=params) - data = resp.json() - - if data.get("errcode") == 0: - return {"access_token": data.get("access_token"), "expires_in": data.get("expires_in")} - else: - logger.error(f"[DingTalk] Failed to get access_token: {data}") - return {"errcode": data.get("errcode"), "errmsg": data.get("errmsg")} - except Exception as e: - logger.error(f"[DingTalk] Network error getting access_token: {e}") - return {"errcode": -1, "errmsg": str(e)} + """Get DingTalk access_token using centralized token manager with caching.""" + from app.services.dingtalk_token import dingtalk_token_manager + token = await dingtalk_token_manager.get_token(app_id, app_secret) + if token: + return {"access_token": token, "expires_in": 7200} + return {"access_token": None, "expires_in": 0} async def send_dingtalk_v1_robot_oto_message( diff --git a/backend/app/services/dingtalk_stream.py b/backend/app/services/dingtalk_stream.py index 28a8ba8e2..e2739b68c 100644 --- a/backend/app/services/dingtalk_stream.py +++ b/backend/app/services/dingtalk_stream.py @@ -94,13 +94,17 @@ async def process(self, callback: dingtalk_stream.CallbackMessage): if not user_text: return dingtalk_stream.AckMessage.STATUS_OK, "empty message" - sender_staff_id = incoming.sender_staff_id or incoming.sender_id or "" + sender_staff_id = incoming.sender_staff_id or "" + sender_id = incoming.sender_id or "" + if not sender_staff_id and sender_id: + sender_staff_id = sender_id # fallback + sender_nick = incoming.sender_nick or "" conversation_id = incoming.conversation_id or "" conversation_type = incoming.conversation_type or "1" session_webhook = incoming.session_webhook or "" logger.info( - f"[DingTalk Stream] Message from [{incoming.sender_nick}]{sender_staff_id}: {user_text[:80]}" + f"[DingTalk Stream] Message from [{sender_nick}]{sender_staff_id}: {user_text[:80]}" ) # Dispatch to the main FastAPI event loop for DB + LLM processing @@ -115,6 +119,8 @@ async def process(self, callback: dingtalk_stream.CallbackMessage): conversation_id=conversation_id, conversation_type=conversation_type, session_webhook=session_webhook, + sender_nick=sender_nick, + sender_id=sender_id, ), main_loop, ) diff --git a/backend/app/services/dingtalk_token.py b/backend/app/services/dingtalk_token.py new file mode 100644 index 000000000..09b516822 --- /dev/null +++ b/backend/app/services/dingtalk_token.py @@ -0,0 +1,77 @@ +"""DingTalk access_token global cache manager. + +Caches tokens per app_key with auto-refresh before expiry. +All DingTalk token acquisition should go through this manager. +""" + +import time +import asyncio +from typing import Dict, Optional, Tuple +from loguru import logger +import httpx + + +class DingTalkTokenManager: + """Global DingTalk access_token cache. + + - Cache by app_key + - Token valid for 7200s, refresh 300s early + - Concurrency-safe with asyncio.Lock + """ + + def __init__(self): + self._cache: Dict[str, Tuple[str, float]] = {} + self._locks: Dict[str, asyncio.Lock] = {} + + def _get_lock(self, app_key: str) -> asyncio.Lock: + if app_key not in self._locks: + self._locks[app_key] = asyncio.Lock() + return self._locks[app_key] + + async def get_token(self, app_key: str, app_secret: str) -> Optional[str]: + """Get access_token, return cached if valid, refresh if expired.""" + if app_key in self._cache: + token, expires_at = self._cache[app_key] + if time.time() < expires_at - 300: + return token + + async with self._get_lock(app_key): + # Double-check after acquiring lock + if app_key in self._cache: + token, expires_at = self._cache[app_key] + if time.time() < expires_at - 300: + return token + + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post( + "https://api.dingtalk.com/v1.0/oauth2/accessToken", + json={"appKey": app_key, "appSecret": app_secret}, + ) + data = resp.json() + token = data.get("accessToken") + expires_in = data.get("expireIn", 7200) + + if token: + self._cache[app_key] = (token, time.time() + expires_in) + logger.debug(f"[DingTalk Token] Refreshed for {app_key[:8]}..., expires in {expires_in}s") + return token + + logger.error(f"[DingTalk Token] Failed to get token: {data}") + return None + except Exception as e: + logger.error(f"[DingTalk Token] Error getting token: {e}") + return None + + async def get_corp_token(self, app_key: str, app_secret: str) -> Optional[str]: + """Get corp access_token via oapi.dingtalk.com/gettoken (GET). + + Used for corp API calls like /topapi/v2/user/get. + Shares the same cache since the token works for both APIs. + """ + # The v1.0 OAuth2 token also works for corp APIs, so reuse it + return await self.get_token(app_key, app_secret) + + +# Global singleton +dingtalk_token_manager = DingTalkTokenManager()