Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 85 additions & 2 deletions backend/app/api/dingtalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,54 @@
router = APIRouter(tags=["dingtalk"])


# ─── DingTalk Corp API helpers ──────────────────────────

async def _get_dingtalk_user_detail(
app_key: str,
app_secret: str,
staff_id: str,
) -> dict | None:
"""Query DingTalk user detail via corp API to get unionId/mobile/email.

Uses /topapi/v2/user/get, requires contact.user.read permission.
Returns None on failure (graceful degradation).
"""
import httpx
from app.services.dingtalk_token import dingtalk_token_manager

try:
access_token = await dingtalk_token_manager.get_corp_token(app_key, app_secret)
if not access_token:
return None

async with httpx.AsyncClient(timeout=10) as client:
user_resp = await client.post(
"https://oapi.dingtalk.com/topapi/v2/user/get",
params={"access_token": access_token},
json={"userid": staff_id, "language": "zh_CN"},
)
user_data = user_resp.json()

if user_data.get("errcode") != 0:
logger.warning(
f"[DingTalk] /topapi/v2/user/get failed for {staff_id}: "
f"errcode={user_data.get('errcode')} errmsg={user_data.get('errmsg')}"
)
return None

result = user_data.get("result", {})
return {
"unionid": result.get("unionid", ""),
"mobile": result.get("mobile", ""),
"email": result.get("email", "") or result.get("org_email", ""),
"name": result.get("name", ""),
}

except Exception as e:
logger.warning(f"[DingTalk] _get_dingtalk_user_detail error for {staff_id}: {e}")
return None


# ─── Config CRUD ────────────────────────────────────────

@router.post("/agents/{agent_id}/dingtalk-channel", response_model=ChannelConfigOut, status_code=201)
Expand Down Expand Up @@ -145,6 +193,8 @@ async def process_dingtalk_message(
conversation_id: str,
conversation_type: str,
session_webhook: str,
sender_nick: str = "",
sender_id: str = "",
):
"""Process an incoming DingTalk bot message and reply via session webhook."""
import json
Expand Down Expand Up @@ -177,13 +227,46 @@ async def process_dingtalk_message(
# P2P / single chat
conv_id = f"dingtalk_p2p_{sender_staff_id}"

# Resolve channel user via unified service (uses OrgMember + SSO patterns)
# Build extra_info for user resolution using senderStaffId as primary key.
# Try to enrich with corp API data (unionid/mobile/email) for better
# cross-channel matching when the basic OrgMember lookup fails.
extra_info: dict = {"name": sender_nick} if sender_nick else {}

# Load channel config for corp API calls
_cfg_r = await db.execute(
_select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "dingtalk",
)
)
_cfg = _cfg_r.scalar_one_or_none()

if _cfg and _cfg.app_id and _cfg.app_secret:
dt_detail = await _get_dingtalk_user_detail(
_cfg.app_id, _cfg.app_secret, sender_staff_id
)
if dt_detail:
if dt_detail.get("unionid"):
extra_info["unionid"] = dt_detail["unionid"]
if dt_detail.get("mobile"):
extra_info["mobile"] = dt_detail["mobile"]
if dt_detail.get("email"):
extra_info["email"] = dt_detail["email"]
if dt_detail.get("name") and not sender_nick:
extra_info["name"] = dt_detail["name"]
logger.debug(
f"[DingTalk] Enriched user detail for {sender_staff_id}: "
f"unionid={dt_detail.get('unionid', '')[:8]}..."
)

# Resolve channel user via unified service (uses OrgMember + SSO patterns).
# senderStaffId is the stable enterprise userId — used as external_user_id.
platform_user = await channel_user_service.resolve_channel_user(
db=db,
agent=agent_obj,
channel_type="dingtalk",
external_user_id=sender_staff_id,
extra_info={"unionid": sender_staff_id},
extra_info=extra_info,
)
platform_user_id = platform_user.id

Expand Down
29 changes: 6 additions & 23 deletions backend/app/services/dingtalk_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,12 @@


async def get_dingtalk_access_token(app_id: str, app_secret: str) -> dict:
"""Get DingTalk access_token using app_id and app_secret.

API: https://open.dingtalk.com/document/orgapp/obtain-access_token
"""
url = "https://oapi.dingtalk.com/gettoken"
params = {
"appkey": app_id,
"appsecret": app_secret,
}

async with httpx.AsyncClient(timeout=10) as client:
try:
resp = await client.get(url, params=params)
data = resp.json()

if data.get("errcode") == 0:
return {"access_token": data.get("access_token"), "expires_in": data.get("expires_in")}
else:
logger.error(f"[DingTalk] Failed to get access_token: {data}")
return {"errcode": data.get("errcode"), "errmsg": data.get("errmsg")}
except Exception as e:
logger.error(f"[DingTalk] Network error getting access_token: {e}")
return {"errcode": -1, "errmsg": str(e)}
"""Get DingTalk access_token using centralized token manager with caching."""
from app.services.dingtalk_token import dingtalk_token_manager
token = await dingtalk_token_manager.get_token(app_id, app_secret)
if token:
return {"access_token": token, "expires_in": 7200}
return {"access_token": None, "expires_in": 0}


async def send_dingtalk_v1_robot_oto_message(
Expand Down
10 changes: 8 additions & 2 deletions backend/app/services/dingtalk_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,17 @@ async def process(self, callback: dingtalk_stream.CallbackMessage):
if not user_text:
return dingtalk_stream.AckMessage.STATUS_OK, "empty message"

sender_staff_id = incoming.sender_staff_id or incoming.sender_id or ""
sender_staff_id = incoming.sender_staff_id or ""
sender_id = incoming.sender_id or ""
if not sender_staff_id and sender_id:
sender_staff_id = sender_id # fallback
sender_nick = incoming.sender_nick or ""
conversation_id = incoming.conversation_id or ""
conversation_type = incoming.conversation_type or "1"
session_webhook = incoming.session_webhook or ""

logger.info(
f"[DingTalk Stream] Message from [{incoming.sender_nick}]{sender_staff_id}: {user_text[:80]}"
f"[DingTalk Stream] Message from [{sender_nick}]{sender_staff_id}: {user_text[:80]}"
)

# Dispatch to the main FastAPI event loop for DB + LLM processing
Expand All @@ -115,6 +119,8 @@ async def process(self, callback: dingtalk_stream.CallbackMessage):
conversation_id=conversation_id,
conversation_type=conversation_type,
session_webhook=session_webhook,
sender_nick=sender_nick,
sender_id=sender_id,
),
main_loop,
)
Expand Down
77 changes: 77 additions & 0 deletions backend/app/services/dingtalk_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""DingTalk access_token global cache manager.

Caches tokens per app_key with auto-refresh before expiry.
All DingTalk token acquisition should go through this manager.
"""

import time
import asyncio
from typing import Dict, Optional, Tuple
from loguru import logger
import httpx


class DingTalkTokenManager:
"""Global DingTalk access_token cache.

- Cache by app_key
- Token valid for 7200s, refresh 300s early
- Concurrency-safe with asyncio.Lock
"""

def __init__(self):
self._cache: Dict[str, Tuple[str, float]] = {}
self._locks: Dict[str, asyncio.Lock] = {}

def _get_lock(self, app_key: str) -> asyncio.Lock:
if app_key not in self._locks:
self._locks[app_key] = asyncio.Lock()
return self._locks[app_key]

async def get_token(self, app_key: str, app_secret: str) -> Optional[str]:
"""Get access_token, return cached if valid, refresh if expired."""
if app_key in self._cache:
token, expires_at = self._cache[app_key]
if time.time() < expires_at - 300:
return token

async with self._get_lock(app_key):
# Double-check after acquiring lock
if app_key in self._cache:
token, expires_at = self._cache[app_key]
if time.time() < expires_at - 300:
return token

try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
"https://api.dingtalk.com/v1.0/oauth2/accessToken",
json={"appKey": app_key, "appSecret": app_secret},
)
data = resp.json()
token = data.get("accessToken")
expires_in = data.get("expireIn", 7200)

if token:
self._cache[app_key] = (token, time.time() + expires_in)
logger.debug(f"[DingTalk Token] Refreshed for {app_key[:8]}..., expires in {expires_in}s")
return token

logger.error(f"[DingTalk Token] Failed to get token: {data}")
return None
except Exception as e:
logger.error(f"[DingTalk Token] Error getting token: {e}")
return None

async def get_corp_token(self, app_key: str, app_secret: str) -> Optional[str]:
"""Get corp access_token via oapi.dingtalk.com/gettoken (GET).

Used for corp API calls like /topapi/v2/user/get.
Shares the same cache since the token works for both APIs.
"""
# The v1.0 OAuth2 token also works for corp APIs, so reuse it
return await self.get_token(app_key, app_secret)


# Global singleton
dingtalk_token_manager = DingTalkTokenManager()