-
Notifications
You must be signed in to change notification settings - Fork 573
Expand file tree
/
Copy pathdingtalk.py
More file actions
384 lines (336 loc) · 15.1 KB
/
dingtalk.py
File metadata and controls
384 lines (336 loc) · 15.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
"""DingTalk Channel API routes.
Provides Config CRUD and message handling for DingTalk bots using Stream mode.
"""
import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from loguru import logger
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.permissions import check_agent_access, is_agent_creator
from app.core.security import get_current_user
from app.database import get_db
from app.models.channel_config import ChannelConfig
from app.models.user import User
from app.schemas.schemas import ChannelConfigOut
router = APIRouter(tags=["dingtalk"])
# ─── Config CRUD ────────────────────────────────────────
@router.post("/agents/{agent_id}/dingtalk-channel", response_model=ChannelConfigOut, status_code=201)
async def configure_dingtalk_channel(
agent_id: uuid.UUID,
data: dict,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Configure DingTalk bot for an agent. Fields: app_key, app_secret, agent_id (optional)."""
agent, _ = await check_agent_access(db, current_user, agent_id)
if not is_agent_creator(current_user, agent):
raise HTTPException(status_code=403, detail="Only creator can configure channel")
app_key = data.get("app_key", "").strip()
app_secret = data.get("app_secret", "").strip()
if not app_key or not app_secret:
raise HTTPException(status_code=422, detail="app_key and app_secret are required")
# Handle connection mode (Stream/WebSocket vs Webhook) and agent_id
extra_config = data.get("extra_config", {})
conn_mode = extra_config.get("connection_mode", "websocket")
dingtalk_agent_id = extra_config.get("agent_id", "") # DingTalk AgentId for API messaging
result = await db.execute(
select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "dingtalk",
)
)
existing = result.scalar_one_or_none()
if existing:
existing.app_id = app_key
existing.app_secret = app_secret
existing.is_configured = True
existing.extra_config = {**existing.extra_config, "connection_mode": conn_mode, "agent_id": dingtalk_agent_id}
await db.flush()
# Restart Stream client if in websocket mode
if conn_mode == "websocket":
from app.services.dingtalk_stream import dingtalk_stream_manager
import asyncio
asyncio.create_task(dingtalk_stream_manager.start_client(agent_id, app_key, app_secret))
else:
# Stop existing Stream client if switched to webhook
from app.services.dingtalk_stream import dingtalk_stream_manager
import asyncio
asyncio.create_task(dingtalk_stream_manager.stop_client(agent_id))
return ChannelConfigOut.model_validate(existing)
config = ChannelConfig(
agent_id=agent_id,
channel_type="dingtalk",
app_id=app_key,
app_secret=app_secret,
is_configured=True,
extra_config={"connection_mode": conn_mode},
)
db.add(config)
await db.flush()
# Start Stream client if in websocket mode
if conn_mode == "websocket":
from app.services.dingtalk_stream import dingtalk_stream_manager
import asyncio
asyncio.create_task(dingtalk_stream_manager.start_client(agent_id, app_key, app_secret))
return ChannelConfigOut.model_validate(config)
@router.get("/agents/{agent_id}/dingtalk-channel", response_model=ChannelConfigOut)
async def get_dingtalk_channel(
agent_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
await check_agent_access(db, current_user, agent_id)
result = await db.execute(
select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "dingtalk",
)
)
config = result.scalar_one_or_none()
if not config:
raise HTTPException(status_code=404, detail="DingTalk not configured")
return ChannelConfigOut.model_validate(config)
@router.delete("/agents/{agent_id}/dingtalk-channel", status_code=204)
async def delete_dingtalk_channel(
agent_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
agent, _ = await check_agent_access(db, current_user, agent_id)
if not is_agent_creator(current_user, agent):
raise HTTPException(status_code=403, detail="Only creator can remove channel")
result = await db.execute(
select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "dingtalk",
)
)
config = result.scalar_one_or_none()
if not config:
raise HTTPException(status_code=404, detail="DingTalk not configured")
await db.delete(config)
# Stop Stream client
from app.services.dingtalk_stream import dingtalk_stream_manager
import asyncio
asyncio.create_task(dingtalk_stream_manager.stop_client(agent_id))
# ─── Message Processing (called by Stream callback) ────
async def process_dingtalk_message(
agent_id: uuid.UUID,
sender_staff_id: str,
user_text: str,
conversation_id: str,
conversation_type: str,
session_webhook: str,
):
"""Process an incoming DingTalk bot message and reply via session webhook."""
import json
import httpx
from datetime import datetime, timezone
from sqlalchemy import select as _select
from app.database import async_session
from app.models.agent import Agent as AgentModel
from app.models.audit import ChatMessage
from app.services.channel_session import find_or_create_channel_session
from app.services.channel_user_service import channel_user_service
from app.api.feishu import _call_agent_llm
async with async_session() as db:
# Load agent
agent_r = await db.execute(_select(AgentModel).where(AgentModel.id == agent_id))
agent_obj = agent_r.scalar_one_or_none()
if not agent_obj:
logger.warning(f"[DingTalk] Agent {agent_id} not found")
return
creator_id = agent_obj.creator_id
from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE
ctx_size = (agent_obj.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE) if agent_obj else DEFAULT_CONTEXT_WINDOW_SIZE
# Determine conv_id for session isolation
if conversation_type == "2":
# Group chat
conv_id = f"dingtalk_group_{conversation_id}"
else:
# P2P / single chat
conv_id = f"dingtalk_p2p_{sender_staff_id}"
# Fetch user detail from DingTalk corp API for cross-channel matching
extra_info: dict = {"unionid": sender_staff_id}
try:
cfg_r = await db.execute(
_select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "dingtalk",
)
)
dt_config = cfg_r.scalar_one_or_none()
if dt_config and dt_config.app_id and dt_config.app_secret:
from app.services.dingtalk_service import get_dingtalk_user_detail
user_detail = await get_dingtalk_user_detail(
dt_config.app_id, dt_config.app_secret, sender_staff_id
)
if user_detail:
dt_mobile = user_detail.get("mobile", "")
dt_email = user_detail.get("email", "") or user_detail.get("org_email", "")
dt_unionid = user_detail.get("unionid", "")
dt_name = user_detail.get("name", "")
extra_info = {
"unionid": dt_unionid or sender_staff_id,
"name": dt_name,
"mobile": dt_mobile or None,
"email": dt_email or None,
"avatar_url": user_detail.get("avatar", ""),
}
except Exception as e:
logger.warning(f"[DingTalk] Failed to fetch user detail for {sender_staff_id}: {e}")
# 真实 unionid 可能与 sender_staff_id 不同; 一并作为候选参与 OrgMember 匹配
real_unionid = extra_info.get("unionid")
candidate_extra_ids: list[str] = []
if real_unionid and real_unionid != sender_staff_id:
candidate_extra_ids.append(real_unionid)
platform_user = await channel_user_service.resolve_channel_user(
db=db,
agent=agent_obj,
channel_type="dingtalk",
external_user_id=sender_staff_id,
extra_info=extra_info,
extra_ids=candidate_extra_ids,
)
platform_user_id = platform_user.id
# Find or create session
sess = await find_or_create_channel_session(
db=db,
agent_id=agent_id,
user_id=platform_user_id,
external_conv_id=conv_id,
source_channel="dingtalk",
first_message_title=user_text,
)
session_conv_id = str(sess.id)
# Load history
history_r = await db.execute(
_select(ChatMessage)
.where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == session_conv_id)
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
)
history = [{"role": m.role, "content": m.content} for m in reversed(history_r.scalars().all())]
# Save user message
db.add(ChatMessage(
agent_id=agent_id, user_id=platform_user_id,
role="user", content=user_text,
conversation_id=session_conv_id,
))
sess.last_message_at = datetime.now(timezone.utc)
await db.commit()
# Call LLM
reply_text = await _call_agent_llm(
db, agent_id, user_text,
history=history, user_id=platform_user_id,
)
logger.info(f"[DingTalk] LLM reply: {reply_text[:100]}")
# Reply via session webhook (markdown)
try:
async with httpx.AsyncClient(timeout=10) as client:
await client.post(session_webhook, json={
"msgtype": "markdown",
"markdown": {
"title": agent_obj.name or "AI Reply",
"text": reply_text,
},
})
except Exception as e:
logger.error(f"[DingTalk] Failed to reply via webhook: {e}")
# Fallback: try plain text
try:
async with httpx.AsyncClient(timeout=10) as client:
await client.post(session_webhook, json={
"msgtype": "text",
"text": {"content": reply_text},
})
except Exception as e2:
logger.error(f"[DingTalk] Fallback text reply also failed: {e2}")
# Save assistant reply
db.add(ChatMessage(
agent_id=agent_id, user_id=platform_user_id,
role="assistant", content=reply_text,
conversation_id=session_conv_id,
))
sess.last_message_at = datetime.now(timezone.utc)
await db.commit()
# Log activity
from app.services.activity_logger import log_activity
await log_activity(
agent_id, "chat_reply",
f"Replied to DingTalk message: {reply_text[:80]}",
detail={"channel": "dingtalk", "user_text": user_text[:200], "reply": reply_text[:500]},
)
# ─── OAuth Callback (SSO) ──────────────────────────────
@router.get("/auth/dingtalk/callback")
async def dingtalk_callback(
authCode: str, # DingTalk uses authCode parameter
state: str = None,
db: AsyncSession = Depends(get_db),
):
"""Callback for DingTalk OAuth2 login."""
from app.models.identity import SSOScanSession
from app.core.security import create_access_token
from fastapi.responses import HTMLResponse
from app.services.auth_registry import auth_provider_registry
# 1. Resolve session to get tenant context
tenant_id = None
if state:
try:
sid = uuid.UUID(state)
s_res = await db.execute(select(SSOScanSession).where(SSOScanSession.id == sid))
session = s_res.scalar_one_or_none()
if session:
tenant_id = session.tenant_id
except (ValueError, AttributeError):
pass
# 2. Get DingTalk provider config
auth_provider = await auth_provider_registry.get_provider(db, "dingtalk", str(tenant_id) if tenant_id else None)
if not auth_provider:
return HTMLResponse("Auth failed: DingTalk provider not configured for this tenant")
# 3. Exchange code for token and get user info
try:
# Step 1: Exchange authCode for userAccessToken
token_data = await auth_provider.exchange_code_for_token(authCode)
access_token = token_data.get("access_token")
if not access_token:
logger.error(f"DingTalk token exchange failed: {token_data}")
return HTMLResponse(f"Auth failed: Token exchange error")
# Step 2: Get user info using modern v1.0 API
user_info = await auth_provider.get_user_info(access_token)
if not user_info.provider_union_id:
logger.error(f"DingTalk user info missing unionId: {user_info.raw_data}")
return HTMLResponse("Auth failed: No unionid returned")
# Step 3: Find or create user (handles OrgMember linking)
user, is_new = await auth_provider.find_or_create_user(
db, user_info, tenant_id=str(tenant_id) if tenant_id else None
)
if not user:
return HTMLResponse("Auth failed: User resolution failed")
except Exception as e:
logger.error(f"DingTalk login error: {e}")
return HTMLResponse(f"Auth failed: {str(e)}")
# 4. Standard login
token = create_access_token(str(user.id), user.role)
if state:
try:
sid = uuid.UUID(state)
s_res = await db.execute(select(SSOScanSession).where(SSOScanSession.id == sid))
session = s_res.scalar_one_or_none()
if session:
session.status = "authorized"
session.provider_type = "dingtalk"
session.user_id = user.id
session.access_token = token
session.error_msg = None
await db.commit()
return HTMLResponse(
f"""<html><head><meta charset="utf-8" /></head>
<body style="font-family: sans-serif; padding: 24px;">
<div>SSO login successful. Redirecting...</div>
<script>window.location.href = "/sso/entry?sid={sid}&complete=1";</script>
</body></html>"""
)
except Exception as e:
logger.exception("Failed to update SSO session (dingtalk) %s", e)
return HTMLResponse(f"Logged in. Token: {token}")