Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 203 additions & 3 deletions backend/app/api/dingtalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,122 @@ async def delete_dingtalk_channel(
asyncio.create_task(dingtalk_stream_manager.stop_client(agent_id))


# ─── Message Dedup (processing → done state machine) ──────────────────────

# 单条消息允许处理的最长时间(超出自动释放, 允许钉钉重传再次进入)
PROCESSING_TTL: float = 180.0
# 成功处理后的去重保留窗口
DONE_TTL: float = 600.0
# 内存 fallback periodic cleanup 触发间隔
_DEDUP_GC_EVERY: int = 100

# 内存存储: {message_id: (state, expire_at_monotonic)} ; state ∈ {"processing", "done"}
_processed_messages: dict[str, tuple[str, float]] = {}
_dedup_check_counter: int = 0

# Redis 客户端 factory: 模块级可替换变量, 便于测试 monkeypatch 禁用。
# 失败时置 None, _redis_client_or_none 返回 None 退到内存 fallback。
try:
from app.core.events import get_redis as _get_redis_client
except Exception: # pragma: no cover - import 层容错
_get_redis_client = None # type: ignore[assignment]


def _dedup_now() -> float:
import time as _t
return _t.monotonic()


def _dedup_gc(now: float) -> None:
global _dedup_check_counter
_dedup_check_counter += 1
if _dedup_check_counter % _DEDUP_GC_EVERY != 0:
return
expired = [k for k, (_, exp) in _processed_messages.items() if exp < now]
for k in expired:
del _processed_messages[k]


async def _redis_client_or_none():
if _get_redis_client is None:
return None
try:
return await _get_redis_client()
except Exception:
return None


def _redis_key(message_id: str) -> str:
return f"dingtalk:dedup:{message_id}"


async def acquire_dedup_lock(message_id: str) -> tuple[bool, str]:
"""Claim processing rights for a given DingTalk message_id.

Returns (accepted, state):
accepted=True → 获得处理权, state="new"
accepted=False → 已有处理 / 已完成, state ∈ {"processing","done"}
"""
if not message_id:
return True, "new"

redis = await _redis_client_or_none()
if redis is not None:
key = _redis_key(message_id)
# 先用 SET NX 尝试抢锁;不成功再读当前状态判断 processing/done
was_set = await redis.set(
key, "processing", ex=int(PROCESSING_TTL), nx=True
)
if was_set:
return True, "new"
existing = await redis.get(key)
if existing in (b"done", "done"):
return False, "done"
return False, "processing"

# 内存 fallback
now = _dedup_now()
hit = _processed_messages.get(message_id)
if hit and hit[1] > now:
return False, hit[0]
_processed_messages[message_id] = ("processing", now + PROCESSING_TTL)
_dedup_gc(now)
return True, "new"


async def mark_dedup_done(message_id: str) -> None:
"""Transition processing → done with DONE_TTL."""
if not message_id:
return
redis = await _redis_client_or_none()
if redis is not None:
await redis.set(_redis_key(message_id), "done", ex=int(DONE_TTL))
return
now = _dedup_now()
_processed_messages[message_id] = ("done", now + DONE_TTL)


async def release_dedup_lock(message_id: str) -> None:
"""Release processing lock on failure so a retransmit can re-attempt.

仅在当前值为 "processing" 时删除, 避免误删 done。
"""
if not message_id:
return
redis = await _redis_client_or_none()
if redis is not None:
try:
current = await redis.get(_redis_key(message_id))
if current in (b"processing", "processing"):
await redis.delete(_redis_key(message_id))
except Exception:
pass
return
hit = _processed_messages.get(message_id)
if hit and hit[0] == "processing":
del _processed_messages[message_id]


# ─── Message Processing (called by Stream callback) ────

async def process_dingtalk_message(
Expand All @@ -145,8 +261,56 @@ async def process_dingtalk_message(
conversation_id: str,
conversation_type: str,
session_webhook: str,
message_id: str = "",
):
"""Process an incoming DingTalk bot message and reply via session webhook."""
"""Process an incoming DingTalk bot message and reply via session webhook.

Dedup wrapper:
- acquire processing lock
- on success → mark done
- on exception → release lock so retransmit can retry
"""
accepted, state = await acquire_dedup_lock(message_id)
if not accepted:
logger.info(
f"[DingTalk] Skip duplicate message_id={message_id} (state={state})"
)
return

try:
await _process_dingtalk_message_inner(
agent_id=agent_id,
sender_staff_id=sender_staff_id,
user_text=user_text,
conversation_id=conversation_id,
conversation_type=conversation_type,
session_webhook=session_webhook,
)
except Exception:
logger.exception(
f"[DingTalk] Processing failed for message_id={message_id}; releasing dedup lock"
)
await release_dedup_lock(message_id)
raise
else:
await mark_dedup_done(message_id)


async def _process_dingtalk_message_inner(
*,
agent_id: uuid.UUID,
sender_staff_id: str,
user_text: str,
conversation_id: str,
conversation_type: str,
session_webhook: str,
) -> None:
"""Actual DingTalk message processing (outside the dedup wrapper).

Body is a verbatim move of the original process_dingtalk_message body,
starting from `import json` onward. The dedup check that used to be at
the top is now handled by the wrapper.
"""
import json
import httpx
from datetime import datetime, timezone
Expand Down Expand Up @@ -177,13 +341,49 @@ async def process_dingtalk_message(
# P2P / single chat
conv_id = f"dingtalk_p2p_{sender_staff_id}"

# Resolve channel user via unified service (uses OrgMember + SSO patterns)
# Fetch user detail from DingTalk corp API for cross-channel matching
extra_info: dict = {"unionid": sender_staff_id}
try:
cfg_r = await db.execute(
_select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "dingtalk",
)
)
dt_config = cfg_r.scalar_one_or_none()
if dt_config and dt_config.app_id and dt_config.app_secret:
from app.services.dingtalk_service import get_dingtalk_user_detail
user_detail = await get_dingtalk_user_detail(
dt_config.app_id, dt_config.app_secret, sender_staff_id
)
if user_detail:
dt_mobile = user_detail.get("mobile", "")
dt_email = user_detail.get("email", "") or user_detail.get("org_email", "")
dt_unionid = user_detail.get("unionid", "")
dt_name = user_detail.get("name", "")
extra_info = {
"unionid": dt_unionid or sender_staff_id,
"name": dt_name,
"mobile": dt_mobile or None,
"email": dt_email or None,
"avatar_url": user_detail.get("avatar", ""),
}
except Exception as e:
logger.warning(f"[DingTalk] Failed to fetch user detail for {sender_staff_id}: {e}")

# 真实 unionid 可能与 sender_staff_id 不同; 一并作为候选参与 OrgMember 匹配
real_unionid = extra_info.get("unionid")
candidate_extra_ids: list[str] = []
if real_unionid and real_unionid != sender_staff_id:
candidate_extra_ids.append(real_unionid)

platform_user = await channel_user_service.resolve_channel_user(
db=db,
agent=agent_obj,
channel_type="dingtalk",
external_user_id=sender_staff_id,
extra_info={"unionid": sender_staff_id},
extra_info=extra_info,
extra_ids=candidate_extra_ids,
)
platform_user_id = platform_user.id

Expand Down
Loading