Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions backend/app/api/dingtalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,49 @@ async def process_dingtalk_message(
# P2P / single chat
conv_id = f"dingtalk_p2p_{sender_staff_id}"

# Resolve channel user via unified service (uses OrgMember + SSO patterns)
# Fetch user detail from DingTalk corp API for cross-channel matching
extra_info: dict = {"unionid": sender_staff_id}
try:
cfg_r = await db.execute(
_select(ChannelConfig).where(
ChannelConfig.agent_id == agent_id,
ChannelConfig.channel_type == "dingtalk",
)
)
dt_config = cfg_r.scalar_one_or_none()
if dt_config and dt_config.app_id and dt_config.app_secret:
from app.services.dingtalk_service import get_dingtalk_user_detail
user_detail = await get_dingtalk_user_detail(
dt_config.app_id, dt_config.app_secret, sender_staff_id
)
if user_detail:
dt_mobile = user_detail.get("mobile", "")
dt_email = user_detail.get("email", "") or user_detail.get("org_email", "")
dt_unionid = user_detail.get("unionid", "")
dt_name = user_detail.get("name", "")
extra_info = {
"unionid": dt_unionid or sender_staff_id,
"name": dt_name,
"mobile": dt_mobile or None,
"email": dt_email or None,
"avatar_url": user_detail.get("avatar", ""),
}
except Exception as e:
logger.warning(f"[DingTalk] Failed to fetch user detail for {sender_staff_id}: {e}")

# 真实 unionid 可能与 sender_staff_id 不同; 一并作为候选参与 OrgMember 匹配
real_unionid = extra_info.get("unionid")
candidate_extra_ids: list[str] = []
if real_unionid and real_unionid != sender_staff_id:
candidate_extra_ids.append(real_unionid)

platform_user = await channel_user_service.resolve_channel_user(
db=db,
agent=agent_obj,
channel_type="dingtalk",
external_user_id=sender_staff_id,
extra_info={"unionid": sender_staff_id},
extra_info=extra_info,
extra_ids=candidate_extra_ids,
)
platform_user_id = platform_user.id

Expand Down
217 changes: 171 additions & 46 deletions backend/app/services/channel_user_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from typing import Any

from loguru import logger
from sqlalchemy import select
from sqlalchemy import or_, select
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.security import hash_password
Expand Down Expand Up @@ -51,6 +51,7 @@ async def resolve_channel_user(
channel_type: str,
external_user_id: str,
extra_info: dict[str, Any] | None = None,
extra_ids: list[str] | None = None,
) -> User:
"""Resolve channel user identity, find or create platform User.

Expand All @@ -66,6 +67,8 @@ async def resolve_channel_user(
channel_type: "dingtalk" | "wecom" | "feishu"
external_user_id: User ID from external platform (staff_id/userid/open_id)
extra_info: Optional name/avatar/mobile/email from platform API
extra_ids: Additional candidate identifiers (e.g. real unionid discovered
via user/get) OR-matched against OrgMember.unionid/external_id.

Returns:
Resolved User instance
Expand All @@ -76,9 +79,13 @@ async def resolve_channel_user(
# Step 1: Ensure IdentityProvider exists
provider = await self._ensure_provider(db, channel_type, tenant_id)

# Step 2: Try to find OrgMember by external identity
# Step 2: Try to find OrgMember by all candidate identifiers
candidate_ids: list[str] = [external_user_id]
for cid in (extra_ids or []):
if cid and cid not in candidate_ids:
candidate_ids.append(cid)
org_member = await self._find_org_member(
db, provider.id, channel_type, external_user_id
db, provider.id, channel_type, candidate_ids
)

# Step 3: Resolve User from OrgMember or other means
Expand All @@ -91,6 +98,13 @@ async def resolve_channel_user(
logger.debug(
f"[{channel_type}] Found user via linked OrgMember: {user.id}"
)
try:
await self._enrich_user_from_extra_info(db, user, extra_info)
except Exception:
logger.exception(
f"[{channel_type}] enrichment failed for user {user.id}; "
f"continuing without enrichment"
)
return user

# Step 4: Try to find User by email/mobile from extra_info
Expand All @@ -111,38 +125,41 @@ async def resolve_channel_user(
f"[{channel_type}] Matched user by mobile: {user.id}"
)

# If found User by email/mobile, link OrgMember if exists (only for org-sync channels)
# If found User by email/mobile, enrich and link OrgMember
if user:
try:
await self._enrich_user_from_extra_info(db, user, extra_info)
except Exception:
logger.exception(
f"[{channel_type}] enrichment failed for user {user.id}; "
f"continuing without enrichment"
)
if channel_type in ("feishu", "dingtalk", "wecom"):
if org_member and not org_member.user_id:
# Existing shell OrgMember not yet linked → link it
# Existing shell OrgMember not yet linked → link it + backfill ids
org_member.user_id = user.id
self._backfill_org_member_ids(
org_member, channel_type, external_user_id, extra_info
)
elif not org_member:
# No OrgMember found by external_id. Before creating a new shell,
# check if this user already has an OrgMember from org sync so
# we reuse it instead of creating a duplicate entry.
existing_member = await self._find_existing_org_member_for_user(
db, user.id, provider.id, tenant_id
)
if existing_member:
unionid, open_id, external_id = self._get_channel_ids(
channel_type, external_user_id, extra_info
# Reuse the org-synced record: back-fill channel identifiers
# so future direct lookups hit without another user/get call.
self._backfill_org_member_ids(
existing_member, channel_type, external_user_id, extra_info
)
if unionid and not existing_member.unionid:
existing_member.unionid = unionid
if open_id and not existing_member.open_id:
existing_member.open_id = open_id
if external_id and not existing_member.external_id:
existing_member.external_id = external_id
logger.info(
f"[{channel_type}] Reusing org-synced OrgMember {existing_member.id} "
f"for user {user.id} instead of creating a duplicate shell"
f"[{channel_type}] Reusing org-synced OrgMember "
f"{existing_member.id} for user {user.id}; "
f"back-filled channel identifiers"
)
else:
# Truly no OrgMember for this user → create shell
await self._create_org_member_shell(
db, provider, channel_type, external_user_id, extra_info,
linked_user_id=user.id
linked_user_id=user.id,
)
await db.flush()
return user
Expand All @@ -157,10 +174,13 @@ async def resolve_channel_user(
if channel_type in ("feishu", "dingtalk", "wecom"):
if org_member:
org_member.user_id = user.id
self._backfill_org_member_ids(
org_member, channel_type, external_user_id, extra_info
)
else:
await self._create_org_member_shell(
db, provider, channel_type, external_user_id, extra_info,
linked_user_id=user.id
linked_user_id=user.id,
)
await db.flush()
logger.info(
Expand Down Expand Up @@ -200,47 +220,37 @@ async def _find_org_member(
db: AsyncSession,
provider_id: uuid.UUID,
channel_type: str,
external_user_id: str,
candidate_ids: list[str],
) -> OrgMember | None:
"""Find OrgMember by external identity.
"""Find OrgMember by a list of candidate external identifiers.

For Feishu: try unionid first, then open_id, then external_id
For DingTalk: try unionid first, then external_id
For WeCom: try external_id (userid)

Returns None if OrgMember not found or org sync is not enabled for this channel.
所有候选 ID 走 OR 匹配, 适配钉钉同时拥有 staff_id 与 unionid 的场景。
"""
if not candidate_ids:
return None
try:
# Build OR conditions for matching
conditions = [OrgMember.provider_id == provider_id, OrgMember.status == "active"]
base = [OrgMember.provider_id == provider_id, OrgMember.status == "active"]

# Channel-specific matching priority
if channel_type == "feishu":
# Feishu: unionid is most stable, then open_id, then user_id
conditions.append(
(OrgMember.unionid == external_user_id) |
(OrgMember.open_id == external_user_id) |
(OrgMember.external_id == external_user_id)
id_match = or_(
OrgMember.unionid.in_(candidate_ids),
OrgMember.open_id.in_(candidate_ids),
OrgMember.external_id.in_(candidate_ids),
)
elif channel_type == "dingtalk":
# DingTalk: unionid is stable across apps, then external_id
conditions.append(
(OrgMember.unionid == external_user_id) |
(OrgMember.external_id == external_user_id)
id_match = or_(
OrgMember.unionid.in_(candidate_ids),
OrgMember.external_id.in_(candidate_ids),
)
elif channel_type == "wecom":
# WeCom: external_id (userid) is the primary identifier
conditions.append(OrgMember.external_id == external_user_id)
id_match = OrgMember.external_id.in_(candidate_ids)
else:
# Generic fallback (discord, slack, etc. - no org sync)
# These channels don't have OrgMember, return None immediately
return None

query = select(OrgMember).where(*conditions)
query = select(OrgMember).where(*base, id_match)
result = await db.execute(query)
return result.scalar_one_or_none()
except Exception as e:
# OrgMember table may not exist or org sync not enabled
logger.debug(f"[{channel_type}] OrgMember lookup failed: {e}")
return None

Expand Down Expand Up @@ -297,6 +307,121 @@ async def _find_existing_org_member_for_user(
result = await db.execute(query.limit(1))
return result.scalar_one_or_none()

def _backfill_org_member_ids(
self,
member: OrgMember,
channel_type: str,
external_user_id: str,
extra_info: dict[str, Any],
) -> None:
"""回填 channel 特定的 identifier 到现有 OrgMember(只填空字段)。

幂等: 重复调用不覆盖非空值。不写库, 依赖外层 flush。
"""
unionid_from_api = extra_info.get("unionid")

if channel_type == "dingtalk":
if not member.external_id and external_user_id:
member.external_id = external_user_id
if not member.unionid and unionid_from_api:
member.unionid = unionid_from_api

elif channel_type == "feishu":
if external_user_id.startswith("on_"):
if not member.unionid:
member.unionid = external_user_id
elif external_user_id.startswith("ou_"):
if not member.open_id:
member.open_id = external_user_id
if not member.external_id and external_user_id:
member.external_id = external_user_id
if not member.unionid and unionid_from_api:
member.unionid = unionid_from_api

elif channel_type == "wecom":
if not member.external_id and external_user_id:
member.external_id = external_user_id

async def _enrich_user_from_extra_info(
self,
db: AsyncSession,
user: User,
extra_info: dict[str, Any],
) -> None:
"""Enrich existing user with mobile/email/name from channel extra_info.

Only fills in fields that are currently empty on the user AND not
already claimed by another Identity (Identity.phone/email are globally
unique — writing a value that exists elsewhere would raise
IntegrityError and break the caller). On conflict, the field is
silently skipped (logged at warning level).
"""
from app.models.user import Identity

updated = False
name = extra_info.get("name")
mobile = extra_info.get("mobile")
email = extra_info.get("email")
avatar = extra_info.get("avatar_url")

if name and not user.display_name:
user.display_name = name
updated = True
if avatar and not user.avatar_url:
user.avatar_url = avatar
updated = True

# Enrich Identity-level fields (phone, email) if available.
# Pre-check for conflicts on globally unique fields to avoid
# IntegrityError from collision with another Identity.
if user.identity_id and (mobile or email):
identity = await db.get(Identity, user.identity_id)
if identity:
if mobile and not identity.phone:
if await self._identity_field_in_use(
db, Identity.phone, mobile, identity.id
):
logger.warning(
f"[enrich] phone={mobile} already claimed by another "
f"identity; skipping phone backfill for identity {identity.id}"
)
else:
identity.phone = mobile
updated = True
if email and not identity.email:
if await self._identity_field_in_use(
db, Identity.email, email, identity.id
):
logger.warning(
f"[enrich] email={email} already claimed by another "
f"identity; skipping email backfill for identity {identity.id}"
)
else:
identity.email = email
updated = True

if updated:
await db.flush()

async def _identity_field_in_use(
self,
db: AsyncSession,
column,
value: str,
exclude_identity_id: uuid.UUID,
) -> bool:
"""Check whether any OTHER Identity already holds the given value on column.

Used to pre-empt IntegrityError on Identity.phone/email (globally unique).
"""
from app.models.user import Identity

stmt = select(Identity.id).where(
column == value, Identity.id != exclude_identity_id
).limit(1)
result = await db.execute(stmt)
return result.scalar_one_or_none() is not None

async def _create_channel_user(
self,
db: AsyncSession,
Expand Down
Loading