Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 148 additions & 8 deletions backend/app/api/dingtalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async def configure_dingtalk_channel(
existing.is_configured = True
existing.extra_config = {**existing.extra_config, "connection_mode": conn_mode, "agent_id": dingtalk_agent_id}
await db.flush()

# Restart Stream client if in websocket mode
if conn_mode == "websocket":
from app.services.dingtalk_stream import dingtalk_stream_manager
Expand All @@ -68,7 +68,7 @@ async def configure_dingtalk_channel(
from app.services.dingtalk_stream import dingtalk_stream_manager
import asyncio
asyncio.create_task(dingtalk_stream_manager.stop_client(agent_id))

return ChannelConfigOut.model_validate(existing)

config = ChannelConfig(
Expand Down Expand Up @@ -145,8 +145,19 @@ async def process_dingtalk_message(
conversation_id: str,
conversation_type: str,
session_webhook: str,
image_base64_list: list[str] | None = None,
saved_file_paths: list[str] | None = None,
sender_nick: str = "",
message_id: str = "",
):
"""Process an incoming DingTalk bot message and reply via session webhook."""
"""Process an incoming DingTalk bot message and reply via session webhook.

Args:
image_base64_list: List of base64-encoded image data URIs for vision LLM.
saved_file_paths: List of local file paths where media files were saved.
sender_nick: Display name of the sender from DingTalk.
message_id: DingTalk message ID (used for reactions).
"""
import json
import httpx
from datetime import datetime, timezone
Expand Down Expand Up @@ -207,21 +218,150 @@ async def process_dingtalk_message(
)
history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())]

# Re-hydrate historical images for multi-turn LLM context
from app.services.image_context import rehydrate_image_messages
history = rehydrate_image_messages(history, agent_id, max_images=3)

# Build saved_content for DB (no base64 blobs, keep it display-friendly)
import re as _re_dt
_clean_text = _re_dt.sub(
r'\[image_data:data:image/[^;]+;base64,[A-Za-z0-9+/=]+\]',
"", user_text,
).strip()
if saved_file_paths:
from pathlib import Path as _PathDT
_file_prefixes = "\n".join(
f"[file:{_PathDT(p).name}]" for p in saved_file_paths
)
saved_content = f"{_file_prefixes}\n{_clean_text}".strip() if _clean_text else _file_prefixes
else:
saved_content = _clean_text or user_text

# Save user message
db.add(ChatMessage(
agent_id=agent_id, user_id=platform_user_id,
role="user", content=user_text,
role="user", content=saved_content,
conversation_id=session_conv_id,
))
sess.last_message_at = datetime.now(timezone.utc)
await db.commit()

# Build LLM input text: for images, inject base64 markers so vision models can see them
llm_user_text = user_text
if image_base64_list:
image_markers = "\n".join(
f"[image_data:{uri}]" for uri in image_base64_list
)
llm_user_text = f"{user_text}\n{image_markers}" if user_text else image_markers

# ── Set up channel_file_sender so the agent can send files via DingTalk ──
from app.services.agent_tools import channel_file_sender as _cfs
from app.services.dingtalk_stream import (
_upload_dingtalk_media,
_send_dingtalk_media_message,
)

# Load DingTalk credentials from ChannelConfig
_dt_cfg_r = await db.execute(
_select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "dingtalk",
)
)
_dt_cfg = _dt_cfg_r.scalar_one_or_none()
_dt_app_key = _dt_cfg.app_id if _dt_cfg else None
_dt_app_secret = _dt_cfg.app_secret if _dt_cfg else None

_cfs_token = None
if _dt_app_key and _dt_app_secret:
# Determine send target: group -> conversation_id, P2P -> sender_staff_id
_dt_target_id = conversation_id if conversation_type == "2" else sender_staff_id
_dt_conv_type = conversation_type

async def _dingtalk_file_sender(file_path: str, msg: str = ""):
"""Send a file/image/video via DingTalk proactive message API."""
from pathlib import Path as _P

_fp = _P(file_path)
_ext = _fp.suffix.lower()

# Determine media type from extension
if _ext in (".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"):
_media_type = "image"
elif _ext in (".mp4", ".mov", ".avi", ".mkv"):
_media_type = "video"
elif _ext in (".mp3", ".wav", ".ogg", ".amr", ".m4a"):
_media_type = "voice"
else:
_media_type = "file"

# Upload media to DingTalk
_mid = await _upload_dingtalk_media(
_dt_app_key, _dt_app_secret, file_path, _media_type
)

if _mid:
# Send via proactive message API
_ok = await _send_dingtalk_media_message(
_dt_app_key, _dt_app_secret,
_dt_target_id, _mid, _media_type,
_dt_conv_type, filename=_fp.name,
)
if _ok:
# Also send accompany text if provided
if msg:
try:
async with httpx.AsyncClient(timeout=10) as _cl:
await _cl.post(session_webhook, json={
"msgtype": "text",
"text": {"content": msg},
})
except Exception:
pass
return

# Fallback: send a text message with file info
_fallback_parts = []
if msg:
_fallback_parts.append(msg)
_fallback_parts.append(f"[File: {_fp.name}]")
try:
async with httpx.AsyncClient(timeout=10) as _cl:
await _cl.post(session_webhook, json={
"msgtype": "text",
"text": {"content": "\n\n".join(_fallback_parts)},
})
except Exception as _fb_err:
logger.error(f"[DingTalk] Fallback file text also failed: {_fb_err}")

_cfs_token = _cfs.set(_dingtalk_file_sender)

# Call LLM
reply_text = await _call_agent_llm(
db, agent_id, user_text,
history=history, user_id=platform_user_id,
try:
reply_text = await _call_agent_llm(
db, agent_id, llm_user_text,
history=history, user_id=platform_user_id,
)
finally:
# Reset ContextVar
if _cfs_token is not None:
_cfs.reset(_cfs_token)
# Recall thinking reaction (before sending reply)
if message_id and _dt_app_key:
try:
from app.services.dingtalk_reaction import recall_thinking_reaction
await recall_thinking_reaction(
_dt_app_key, _dt_app_secret,
message_id, conversation_id,
)
except Exception as _recall_err:
logger.warning(f"[DingTalk] Failed to recall thinking reaction: {_recall_err}")

has_media = bool(image_base64_list or saved_file_paths)
logger.info(
f"[DingTalk] LLM reply ({'media' if has_media else 'text'} input): "
f"{reply_text[:100]}"
)
logger.info(f"[DingTalk] LLM reply: {reply_text[:100]}")

# Reply via session webhook (markdown)
try:
Expand Down
5 changes: 5 additions & 0 deletions backend/app/api/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ async def call_llm(
# Load tools dynamically from DB
tools_for_llm = await get_agent_tools_for_llm(agent_id) if agent_id else AGENT_TOOLS

# Re-hydrate image context from previous turns for vision-capable models
if supports_vision and agent_id:
from app.services.image_context import rehydrate_image_messages
messages = rehydrate_image_messages(messages, agent_id)

# Convert messages to LLMMessage format
api_messages = [LLMMessage(role="system", content=static_prompt, dynamic_content=dynamic_prompt)]
for msg in messages:
Expand Down
110 changes: 110 additions & 0 deletions backend/app/services/dingtalk_reaction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""DingTalk emotion reaction service — "thinking" indicator on user messages."""

import asyncio
from loguru import logger
from app.services.dingtalk_token import dingtalk_token_manager


async def add_thinking_reaction(
app_key: str,
app_secret: str,
message_id: str,
conversation_id: str,
) -> bool:
"""Add "🤔思考中" reaction to a user message. Fire-and-forget, never raises."""
import httpx

if not message_id or not conversation_id or not app_key:
return False

try:
token = await dingtalk_token_manager.get_token(app_key, app_secret)
if not token:
logger.warning("[DingTalk Reaction] Failed to get access token")
return False

async with httpx.AsyncClient(timeout=5) as client:
resp = await client.post(
"https://api.dingtalk.com/v1.0/robot/emotion/reply",
headers={
"x-acs-dingtalk-access-token": token,
"Content-Type": "application/json",
},
json={
"robotCode": app_key,
"openMsgId": message_id,
"openConversationId": conversation_id,
"emotionType": 2,
"emotionName": "🤔思考中",
"textEmotion": {
"emotionId": "2659900",
"emotionName": "🤔思考中",
"text": "🤔思考中",
"backgroundId": "im_bg_1",
},
},
)
if resp.status_code == 200:
logger.info(f"[DingTalk Reaction] Thinking reaction added for msg {message_id[:16]}")
return True
else:
logger.warning(f"[DingTalk Reaction] Add failed: {resp.status_code} {resp.text[:200]}")
return False
except Exception as e:
logger.warning(f"[DingTalk Reaction] Add thinking reaction error: {e}")
return False


async def recall_thinking_reaction(
app_key: str,
app_secret: str,
message_id: str,
conversation_id: str,
) -> None:
"""Recall "🤔思考中" reaction with retry (0ms, 1500ms, 5000ms). Fire-and-forget."""
import httpx

if not message_id or not conversation_id or not app_key:
return

delays = [0, 1.5, 5.0]

for delay in delays:
if delay > 0:
await asyncio.sleep(delay)

try:
token = await dingtalk_token_manager.get_token(app_key, app_secret)
if not token:
continue

async with httpx.AsyncClient(timeout=5) as client:
resp = await client.post(
"https://api.dingtalk.com/v1.0/robot/emotion/recall",
headers={
"x-acs-dingtalk-access-token": token,
"Content-Type": "application/json",
},
json={
"robotCode": app_key,
"openMsgId": message_id,
"openConversationId": conversation_id,
"emotionType": 2,
"emotionName": "🤔思考中",
"textEmotion": {
"emotionId": "2659900",
"emotionName": "🤔思考中",
"text": "🤔思考中",
"backgroundId": "im_bg_1",
},
},
)
if resp.status_code == 200:
logger.info(f"[DingTalk Reaction] Thinking reaction recalled for msg {message_id[:16]}")
return
else:
logger.warning(f"[DingTalk Reaction] Recall attempt failed: {resp.status_code}")
except Exception as e:
logger.warning(f"[DingTalk Reaction] Recall error: {e}")

logger.warning(f"[DingTalk Reaction] All recall attempts failed for msg {message_id[:16]}")
17 changes: 17 additions & 0 deletions backend/app/services/dingtalk_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,20 @@ async def send_dingtalk_message(
if not agent_id:
agent_id = app_id
return await send_dingtalk_corp_conversation(app_id, app_secret, user_id, msg_body, agent_id)


async def download_dingtalk_media(
app_id: str, app_secret: str, download_code: str
) -> bytes | None:
"""Download a media file from DingTalk using a downloadCode.

Convenience wrapper that delegates to the stream module's download helper.
Returns raw file bytes on success, or None on failure.

Args:
app_id: DingTalk app key (robotCode).
app_secret: DingTalk app secret.
download_code: The downloadCode from the incoming message payload.
"""
from app.services.dingtalk_stream import download_dingtalk_media as _download
return await _download(app_id, app_secret, download_code)
Loading