Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions backend/app/api/dingtalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,58 @@ async def delete_dingtalk_channel(
asyncio.create_task(dingtalk_stream_manager.stop_client(agent_id))


# ─── Message Dedup (prevent DingTalk retransmit causing duplicate processing) ──

_processed_messages: dict[str, float] = {} # {message_id: timestamp}
_dedup_check_counter: int = 0


async def _check_message_dedup(message_id: str) -> bool:
"""Check if a message_id has already been processed. Returns True if duplicate.

Uses Redis SETNX as primary (atomic, cross-process), falls back to in-memory dict.
"""
global _dedup_check_counter

if not message_id:
return False

# Try Redis first
try:
from app.core.events import get_redis
redis_client = await get_redis()
dedup_key = f"dingtalk:dedup:{message_id}"
# SETNX + EX: set only if not exists, expire in 300s
was_set = await redis_client.set(dedup_key, "1", ex=300, nx=True)
if not was_set:
logger.info(f"[DingTalk Dedup] Duplicate message_id={message_id} (Redis)")
return True
return False
except Exception:
pass # Redis unavailable, fall back to in-memory

# In-memory fallback
import time as _time_dedup
now = _time_dedup.time()

if message_id in _processed_messages:
if now - _processed_messages[message_id] < 300: # 5 minutes
logger.info(f"[DingTalk Dedup] Duplicate message_id={message_id} (memory)")
return True

_processed_messages[message_id] = now

# Periodic cleanup (every 100 checks, remove entries older than 10 minutes)
_dedup_check_counter += 1
if _dedup_check_counter % 100 == 0:
cutoff = now - 600
expired = [k for k, v in _processed_messages.items() if v < cutoff]
for k in expired:
del _processed_messages[k]

return False


# ─── Message Processing (called by Stream callback) ────

async def process_dingtalk_message(
Expand All @@ -145,8 +197,14 @@ async def process_dingtalk_message(
conversation_id: str,
conversation_type: str,
session_webhook: str,
message_id: str = "",
):
"""Process an incoming DingTalk bot message and reply via session webhook."""
# Dedup check: skip if this message_id was already processed
if await _check_message_dedup(message_id):
logger.info(f"[DingTalk] Skipping duplicate message_id={message_id}")
return

import json
import httpx
from datetime import datetime, timezone
Expand Down
12 changes: 4 additions & 8 deletions backend/app/services/dingtalk_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ async def process(self, callback: dingtalk_stream.CallbackMessage):
conversation_id = incoming.conversation_id or ""
conversation_type = incoming.conversation_type or "1"
session_webhook = incoming.session_webhook or ""
message_id = incoming.message_id or ""

logger.info(
f"[DingTalk Stream] Message from [{incoming.sender_nick}]{sender_staff_id}: {user_text[:80]}"
Expand All @@ -107,24 +108,19 @@ async def process(self, callback: dingtalk_stream.CallbackMessage):
from app.api.dingtalk import process_dingtalk_message

if main_loop and main_loop.is_running():
future = asyncio.run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
process_dingtalk_message(
agent_id=agent_id,
sender_staff_id=sender_staff_id,
user_text=user_text,
conversation_id=conversation_id,
conversation_type=conversation_type,
session_webhook=session_webhook,
message_id=message_id,
),
main_loop,
)
# Wait for result (with timeout)
try:
future.result(timeout=120)
except Exception as e:
logger.error(f"[DingTalk Stream] LLM processing error: {e}")
import traceback
traceback.print_exc()
# Fire-and-forget: ACK immediately, do not wait for LLM
else:
logger.warning("[DingTalk Stream] Main loop not available for dispatch")

Expand Down