diff --git a/backend/app/api/dingtalk.py b/backend/app/api/dingtalk.py index 9a431ecf..221fe92e 100644 --- a/backend/app/api/dingtalk.py +++ b/backend/app/api/dingtalk.py @@ -177,13 +177,49 @@ async def process_dingtalk_message( # P2P / single chat conv_id = f"dingtalk_p2p_{sender_staff_id}" - # Resolve channel user via unified service (uses OrgMember + SSO patterns) + # Fetch user detail from DingTalk corp API for cross-channel matching + extra_info: dict = {"unionid": sender_staff_id} + try: + cfg_r = await db.execute( + _select(ChannelConfig).where( + ChannelConfig.agent_id == agent_id, + ChannelConfig.channel_type == "dingtalk", + ) + ) + dt_config = cfg_r.scalar_one_or_none() + if dt_config and dt_config.app_id and dt_config.app_secret: + from app.services.dingtalk_service import get_dingtalk_user_detail + user_detail = await get_dingtalk_user_detail( + dt_config.app_id, dt_config.app_secret, sender_staff_id + ) + if user_detail: + dt_mobile = user_detail.get("mobile", "") + dt_email = user_detail.get("email", "") or user_detail.get("org_email", "") + dt_unionid = user_detail.get("unionid", "") + dt_name = user_detail.get("name", "") + extra_info = { + "unionid": dt_unionid or sender_staff_id, + "name": dt_name, + "mobile": dt_mobile or None, + "email": dt_email or None, + "avatar_url": user_detail.get("avatar", ""), + } + except Exception as e: + logger.warning(f"[DingTalk] Failed to fetch user detail for {sender_staff_id}: {e}") + + # 真实 unionid 可能与 sender_staff_id 不同; 一并作为候选参与 OrgMember 匹配 + real_unionid = extra_info.get("unionid") + candidate_extra_ids: list[str] = [] + if real_unionid and real_unionid != sender_staff_id: + candidate_extra_ids.append(real_unionid) + platform_user = await channel_user_service.resolve_channel_user( db=db, agent=agent_obj, channel_type="dingtalk", external_user_id=sender_staff_id, - extra_info={"unionid": sender_staff_id}, + extra_info=extra_info, + extra_ids=candidate_extra_ids, ) platform_user_id = platform_user.id diff --git a/backend/app/services/channel_user_service.py b/backend/app/services/channel_user_service.py index 08a09cca..92b0c6ce 100644 --- a/backend/app/services/channel_user_service.py +++ b/backend/app/services/channel_user_service.py @@ -9,7 +9,7 @@ from typing import Any from loguru import logger -from sqlalchemy import select +from sqlalchemy import or_, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.security import hash_password @@ -51,6 +51,7 @@ async def resolve_channel_user( channel_type: str, external_user_id: str, extra_info: dict[str, Any] | None = None, + extra_ids: list[str] | None = None, ) -> User: """Resolve channel user identity, find or create platform User. @@ -66,6 +67,8 @@ async def resolve_channel_user( channel_type: "dingtalk" | "wecom" | "feishu" external_user_id: User ID from external platform (staff_id/userid/open_id) extra_info: Optional name/avatar/mobile/email from platform API + extra_ids: Additional candidate identifiers (e.g. real unionid discovered + via user/get) OR-matched against OrgMember.unionid/external_id. Returns: Resolved User instance @@ -76,9 +79,13 @@ async def resolve_channel_user( # Step 1: Ensure IdentityProvider exists provider = await self._ensure_provider(db, channel_type, tenant_id) - # Step 2: Try to find OrgMember by external identity + # Step 2: Try to find OrgMember by all candidate identifiers + candidate_ids: list[str] = [external_user_id] + for cid in (extra_ids or []): + if cid and cid not in candidate_ids: + candidate_ids.append(cid) org_member = await self._find_org_member( - db, provider.id, channel_type, external_user_id + db, provider.id, channel_type, candidate_ids ) # Step 3: Resolve User from OrgMember or other means @@ -91,6 +98,13 @@ async def resolve_channel_user( logger.debug( f"[{channel_type}] Found user via linked OrgMember: {user.id}" ) + try: + await self._enrich_user_from_extra_info(db, user, extra_info) + except Exception: + logger.exception( + f"[{channel_type}] enrichment failed for user {user.id}; " + f"continuing without enrichment" + ) return user # Step 4: Try to find User by email/mobile from extra_info @@ -111,38 +125,41 @@ async def resolve_channel_user( f"[{channel_type}] Matched user by mobile: {user.id}" ) - # If found User by email/mobile, link OrgMember if exists (only for org-sync channels) + # If found User by email/mobile, enrich and link OrgMember if user: + try: + await self._enrich_user_from_extra_info(db, user, extra_info) + except Exception: + logger.exception( + f"[{channel_type}] enrichment failed for user {user.id}; " + f"continuing without enrichment" + ) if channel_type in ("feishu", "dingtalk", "wecom"): if org_member and not org_member.user_id: - # Existing shell OrgMember not yet linked → link it + # Existing shell OrgMember not yet linked → link it + backfill ids org_member.user_id = user.id + self._backfill_org_member_ids( + org_member, channel_type, external_user_id, extra_info + ) elif not org_member: - # No OrgMember found by external_id. Before creating a new shell, - # check if this user already has an OrgMember from org sync so - # we reuse it instead of creating a duplicate entry. existing_member = await self._find_existing_org_member_for_user( db, user.id, provider.id, tenant_id ) if existing_member: - unionid, open_id, external_id = self._get_channel_ids( - channel_type, external_user_id, extra_info + # Reuse the org-synced record: back-fill channel identifiers + # so future direct lookups hit without another user/get call. + self._backfill_org_member_ids( + existing_member, channel_type, external_user_id, extra_info ) - if unionid and not existing_member.unionid: - existing_member.unionid = unionid - if open_id and not existing_member.open_id: - existing_member.open_id = open_id - if external_id and not existing_member.external_id: - existing_member.external_id = external_id logger.info( - f"[{channel_type}] Reusing org-synced OrgMember {existing_member.id} " - f"for user {user.id} instead of creating a duplicate shell" + f"[{channel_type}] Reusing org-synced OrgMember " + f"{existing_member.id} for user {user.id}; " + f"back-filled channel identifiers" ) else: - # Truly no OrgMember for this user → create shell await self._create_org_member_shell( db, provider, channel_type, external_user_id, extra_info, - linked_user_id=user.id + linked_user_id=user.id, ) await db.flush() return user @@ -157,10 +174,13 @@ async def resolve_channel_user( if channel_type in ("feishu", "dingtalk", "wecom"): if org_member: org_member.user_id = user.id + self._backfill_org_member_ids( + org_member, channel_type, external_user_id, extra_info + ) else: await self._create_org_member_shell( db, provider, channel_type, external_user_id, extra_info, - linked_user_id=user.id + linked_user_id=user.id, ) await db.flush() logger.info( @@ -200,47 +220,37 @@ async def _find_org_member( db: AsyncSession, provider_id: uuid.UUID, channel_type: str, - external_user_id: str, + candidate_ids: list[str], ) -> OrgMember | None: - """Find OrgMember by external identity. + """Find OrgMember by a list of candidate external identifiers. - For Feishu: try unionid first, then open_id, then external_id - For DingTalk: try unionid first, then external_id - For WeCom: try external_id (userid) - - Returns None if OrgMember not found or org sync is not enabled for this channel. + 所有候选 ID 走 OR 匹配, 适配钉钉同时拥有 staff_id 与 unionid 的场景。 """ + if not candidate_ids: + return None try: - # Build OR conditions for matching - conditions = [OrgMember.provider_id == provider_id, OrgMember.status == "active"] + base = [OrgMember.provider_id == provider_id, OrgMember.status == "active"] - # Channel-specific matching priority if channel_type == "feishu": - # Feishu: unionid is most stable, then open_id, then user_id - conditions.append( - (OrgMember.unionid == external_user_id) | - (OrgMember.open_id == external_user_id) | - (OrgMember.external_id == external_user_id) + id_match = or_( + OrgMember.unionid.in_(candidate_ids), + OrgMember.open_id.in_(candidate_ids), + OrgMember.external_id.in_(candidate_ids), ) elif channel_type == "dingtalk": - # DingTalk: unionid is stable across apps, then external_id - conditions.append( - (OrgMember.unionid == external_user_id) | - (OrgMember.external_id == external_user_id) + id_match = or_( + OrgMember.unionid.in_(candidate_ids), + OrgMember.external_id.in_(candidate_ids), ) elif channel_type == "wecom": - # WeCom: external_id (userid) is the primary identifier - conditions.append(OrgMember.external_id == external_user_id) + id_match = OrgMember.external_id.in_(candidate_ids) else: - # Generic fallback (discord, slack, etc. - no org sync) - # These channels don't have OrgMember, return None immediately return None - query = select(OrgMember).where(*conditions) + query = select(OrgMember).where(*base, id_match) result = await db.execute(query) return result.scalar_one_or_none() except Exception as e: - # OrgMember table may not exist or org sync not enabled logger.debug(f"[{channel_type}] OrgMember lookup failed: {e}") return None @@ -297,6 +307,121 @@ async def _find_existing_org_member_for_user( result = await db.execute(query.limit(1)) return result.scalar_one_or_none() + def _backfill_org_member_ids( + self, + member: OrgMember, + channel_type: str, + external_user_id: str, + extra_info: dict[str, Any], + ) -> None: + """回填 channel 特定的 identifier 到现有 OrgMember(只填空字段)。 + + 幂等: 重复调用不覆盖非空值。不写库, 依赖外层 flush。 + """ + unionid_from_api = extra_info.get("unionid") + + if channel_type == "dingtalk": + if not member.external_id and external_user_id: + member.external_id = external_user_id + if not member.unionid and unionid_from_api: + member.unionid = unionid_from_api + + elif channel_type == "feishu": + if external_user_id.startswith("on_"): + if not member.unionid: + member.unionid = external_user_id + elif external_user_id.startswith("ou_"): + if not member.open_id: + member.open_id = external_user_id + if not member.external_id and external_user_id: + member.external_id = external_user_id + if not member.unionid and unionid_from_api: + member.unionid = unionid_from_api + + elif channel_type == "wecom": + if not member.external_id and external_user_id: + member.external_id = external_user_id + + async def _enrich_user_from_extra_info( + self, + db: AsyncSession, + user: User, + extra_info: dict[str, Any], + ) -> None: + """Enrich existing user with mobile/email/name from channel extra_info. + + Only fills in fields that are currently empty on the user AND not + already claimed by another Identity (Identity.phone/email are globally + unique — writing a value that exists elsewhere would raise + IntegrityError and break the caller). On conflict, the field is + silently skipped (logged at warning level). + """ + from app.models.user import Identity + + updated = False + name = extra_info.get("name") + mobile = extra_info.get("mobile") + email = extra_info.get("email") + avatar = extra_info.get("avatar_url") + + if name and not user.display_name: + user.display_name = name + updated = True + if avatar and not user.avatar_url: + user.avatar_url = avatar + updated = True + + # Enrich Identity-level fields (phone, email) if available. + # Pre-check for conflicts on globally unique fields to avoid + # IntegrityError from collision with another Identity. + if user.identity_id and (mobile or email): + identity = await db.get(Identity, user.identity_id) + if identity: + if mobile and not identity.phone: + if await self._identity_field_in_use( + db, Identity.phone, mobile, identity.id + ): + logger.warning( + f"[enrich] phone={mobile} already claimed by another " + f"identity; skipping phone backfill for identity {identity.id}" + ) + else: + identity.phone = mobile + updated = True + if email and not identity.email: + if await self._identity_field_in_use( + db, Identity.email, email, identity.id + ): + logger.warning( + f"[enrich] email={email} already claimed by another " + f"identity; skipping email backfill for identity {identity.id}" + ) + else: + identity.email = email + updated = True + + if updated: + await db.flush() + + async def _identity_field_in_use( + self, + db: AsyncSession, + column, + value: str, + exclude_identity_id: uuid.UUID, + ) -> bool: + """Check whether any OTHER Identity already holds the given value on column. + + Used to pre-empt IntegrityError on Identity.phone/email (globally unique). + """ + from app.models.user import Identity + + stmt = select(Identity.id).where( + column == value, Identity.id != exclude_identity_id + ).limit(1) + result = await db.execute(stmt) + return result.scalar_one_or_none() is not None + async def _create_channel_user( self, db: AsyncSession, diff --git a/backend/app/services/dingtalk_service.py b/backend/app/services/dingtalk_service.py index 010fc56a..5eae0cda 100644 --- a/backend/app/services/dingtalk_service.py +++ b/backend/app/services/dingtalk_service.py @@ -31,6 +31,38 @@ async def get_dingtalk_access_token(app_id: str, app_secret: str) -> dict: return {"errcode": -1, "errmsg": str(e)} +async def get_dingtalk_user_detail(app_id: str, app_secret: str, userid: str) -> dict | None: + """Fetch user detail from DingTalk corp API by userid (staff_id). + + Returns dict with mobile, email, org_email, unionid, name, etc. + Returns None on failure. + + API: https://open.dingtalk.com/document/orgapp/query-user-details + """ + token_result = await get_dingtalk_access_token(app_id, app_secret) + access_token = token_result.get("access_token") + if not access_token: + return None + + url = "https://oapi.dingtalk.com/topapi/v2/user/get" + async with httpx.AsyncClient(timeout=10) as client: + try: + resp = await client.post( + url, + params={"access_token": access_token}, + json={"userid": userid}, + ) + data = resp.json() + if data.get("errcode") == 0: + return data.get("result", {}) + else: + logger.warning(f"[DingTalk] user/get failed for {userid}: {data.get('errmsg')}") + return None + except Exception as e: + logger.warning(f"[DingTalk] user/get error for {userid}: {e}") + return None + + async def send_dingtalk_v1_robot_oto_message( app_id: str, app_secret: str, diff --git a/backend/tests/test_channel_user_service_identity.py b/backend/tests/test_channel_user_service_identity.py new file mode 100644 index 00000000..771941ee --- /dev/null +++ b/backend/tests/test_channel_user_service_identity.py @@ -0,0 +1,592 @@ +"""channel_user_service 的 OrgMember 匹配与回填逻辑。 + +不走 DB: 用 FakeSession 吸收 session 方法, monkeypatch 替换查询入口。 +聚焦: resolve_channel_user 如何组合 find → match → backfill → link 这一条链。 +""" +from __future__ import annotations + +import uuid +from types import SimpleNamespace + +import pytest + +from app.services import channel_user_service as cus_mod +from app.services.channel_user_service import channel_user_service + + +class _FakeSession: + """吸收 resolve_channel_user 用到的 session 方法, 行为对业务无副作用。""" + + def __init__(self) -> None: + self.added: list = [] + self.flushed = 0 + + def add(self, obj): + self.added.append(obj) + + async def flush(self): + self.flushed += 1 + + async def get(self, model, key): + return None + + async def execute(self, _query): + class _R: + def scalar_one_or_none(self_inner): + return None + return _R() + + +@pytest.fixture +def fake_session(): + return _FakeSession() + + +@pytest.fixture +def agent(): + return SimpleNamespace(id=uuid.uuid4(), tenant_id=uuid.uuid4(), name="A") + + +@pytest.fixture +def patch_provider(monkeypatch): + """跳过 provider 查询; 直接返回固定 IdentityProvider.""" + provider = SimpleNamespace(id=uuid.uuid4(), tenant_id=None, provider_type="dingtalk") + + async def _fake_ensure(self, db, provider_type, tenant_id): + return provider + + monkeypatch.setattr( + cus_mod.ChannelUserService, "_ensure_provider", _fake_ensure + ) + return provider + + +async def test_find_org_member_receives_candidate_ids( + fake_session, agent, patch_provider, monkeypatch +): + captured = {} + + async def _fake_find(self, db, provider_id, channel_type, candidate_ids): + captured["ids"] = list(candidate_ids) + user = SimpleNamespace( + id=uuid.uuid4(), identity_id=None, + display_name="Bob", avatar_url=None, + ) + member = SimpleNamespace(id=uuid.uuid4(), user_id=user.id) + fake_session._preloaded_user = user + return member + + async def _fake_db_get(model, key): + return fake_session._preloaded_user + + monkeypatch.setattr(cus_mod.ChannelUserService, "_find_org_member", _fake_find) + monkeypatch.setattr(fake_session, "get", _fake_db_get, raising=False) + + await channel_user_service.resolve_channel_user( + db=fake_session, + agent=agent, + channel_type="dingtalk", + external_user_id="staff-1", + extra_info={"unionid": "UNION-1"}, + extra_ids=["UNION-1"], + ) + + assert captured["ids"] == ["staff-1", "UNION-1"] + + +async def test_find_org_member_deduplicates_candidate_ids( + fake_session, agent, patch_provider, monkeypatch +): + captured = {} + + async def _fake_find(self, db, provider_id, channel_type, candidate_ids): + captured["ids"] = list(candidate_ids) + user = SimpleNamespace(id=uuid.uuid4(), identity_id=None) + member = SimpleNamespace(id=uuid.uuid4(), user_id=user.id) + fake_session._preloaded_user = user + return member + + monkeypatch.setattr(cus_mod.ChannelUserService, "_find_org_member", _fake_find) + + async def _fake_db_get(model, key): + return fake_session._preloaded_user + + monkeypatch.setattr(fake_session, "get", _fake_db_get, raising=False) + + await channel_user_service.resolve_channel_user( + db=fake_session, + agent=agent, + channel_type="dingtalk", + external_user_id="staff-1", + extra_info={"unionid": "staff-1"}, + extra_ids=["staff-1"], + ) + + assert captured["ids"] == ["staff-1"] # 去重 + + +class _RecordingSession: + """Captures the SQL from db.execute without running it.""" + + def __init__(self): + self.last_stmt = None + + async def execute(self, stmt): + self.last_stmt = stmt + + class _R: + def scalar_one_or_none(self_inner): + return None + + return _R() + + +async def test_find_org_member_sql_dingtalk(): + sess = _RecordingSession() + await channel_user_service._find_org_member( + sess, uuid.uuid4(), "dingtalk", ["staff-1", "UNION-1"] + ) + sql = str(sess.last_stmt.compile(compile_kwargs={"literal_binds": True})) + # Isolate the WHERE clause so SELECT-column references don't pollute checks + where_clause = sql.split("WHERE", 1)[1] + # dingtalk: OR over unionid + external_id, NOT open_id IN (...) + assert "org_members.unionid IN" in where_clause + assert "org_members.external_id IN" in where_clause + assert "org_members.open_id IN" not in where_clause + assert "'staff-1'" in where_clause and "'UNION-1'" in where_clause + + +async def test_find_org_member_sql_feishu(): + sess = _RecordingSession() + await channel_user_service._find_org_member( + sess, uuid.uuid4(), "feishu", ["ou_x", "on_y"] + ) + sql = str(sess.last_stmt.compile(compile_kwargs={"literal_binds": True})) + where_clause = sql.split("WHERE", 1)[1] + # feishu: OR over unionid + open_id + external_id + assert "org_members.unionid IN" in where_clause + assert "org_members.open_id IN" in where_clause + assert "org_members.external_id IN" in where_clause + + +async def test_find_org_member_sql_wecom(): + sess = _RecordingSession() + await channel_user_service._find_org_member( + sess, uuid.uuid4(), "wecom", ["userid-1"] + ) + sql = str(sess.last_stmt.compile(compile_kwargs={"literal_binds": True})) + where_clause = sql.split("WHERE", 1)[1] + # wecom: external_id only, no unionid IN / open_id IN in WHERE + assert "org_members.external_id IN" in where_clause + assert "org_members.unionid IN" not in where_clause + assert "org_members.open_id IN" not in where_clause + + +async def test_find_org_member_empty_ids_returns_none_without_execute(): + sess = _RecordingSession() + result = await channel_user_service._find_org_member( + sess, uuid.uuid4(), "dingtalk", [] + ) + assert result is None + assert sess.last_stmt is None # short-circuits, no execute + + +def _make_member(**kwargs): + defaults = dict( + id=uuid.uuid4(), external_id=None, unionid=None, open_id=None, user_id=None, + ) + defaults.update(kwargs) + return SimpleNamespace(**defaults) + + +def _fake_provider_for(channel_type: str): + return SimpleNamespace( + id=uuid.uuid4(), tenant_id=None, provider_type=channel_type + ) + + +def test_backfill_dingtalk_fills_external_and_unionid(): + svc = channel_user_service + member = _make_member() + svc._backfill_org_member_ids( + member, + channel_type="dingtalk", + external_user_id="staff-carol-777", + extra_info={"unionid": "UNION-CAROL", "mobile": "13800000001"}, + ) + assert member.external_id == "staff-carol-777" + assert member.unionid == "UNION-CAROL" + + +def test_backfill_dingtalk_does_not_overwrite_existing(): + svc = channel_user_service + member = _make_member(external_id="existing-staff", unionid="existing-union") + svc._backfill_org_member_ids( + member, + channel_type="dingtalk", + external_user_id="staff-new", + extra_info={"unionid": "UNION-NEW"}, + ) + assert member.external_id == "existing-staff" + assert member.unionid == "existing-union" + + +def test_backfill_feishu_on_prefix_goes_to_unionid(): + svc = channel_user_service + member = _make_member() + svc._backfill_org_member_ids( + member, + channel_type="feishu", + external_user_id="on_unionid_xxx", + extra_info={}, + ) + assert member.unionid == "on_unionid_xxx" + + +def test_backfill_feishu_ou_prefix_goes_to_openid(): + svc = channel_user_service + member = _make_member() + svc._backfill_org_member_ids( + member, + channel_type="feishu", + external_user_id="ou_openid_xxx", + extra_info={}, + ) + assert member.open_id == "ou_openid_xxx" + + +def test_backfill_wecom_only_fills_external_id(): + svc = channel_user_service + member = _make_member() + svc._backfill_org_member_ids( + member, + channel_type="wecom", + external_user_id="userid-wecom-1", + extra_info={"unionid": "ignored-for-wecom"}, + ) + assert member.external_id == "userid-wecom-1" + assert member.unionid is None + + +async def test_reuse_existing_org_member_triggers_backfill( + fake_session, agent, patch_provider, monkeypatch +): + """email 命中 User → 找到 existing_member → 应回填 dingtalk 标识到 existing_member""" + matched_user = SimpleNamespace( + id=uuid.uuid4(), identity_id=None, + display_name="Carol", avatar_url=None, + ) + existing_member = _make_member(user_id=matched_user.id) + + async def _fake_find_none(self, db, provider_id, channel_type, candidate_ids): + return None + + async def _fake_match_email(db, email, tenant_id): + return matched_user + + async def _fake_match_mobile(db, mobile, tenant_id): + return None + + async def _fake_find_existing(self, db, user_id, provider_id, tenant_id): + return existing_member + + monkeypatch.setattr(cus_mod.ChannelUserService, "_find_org_member", _fake_find_none) + monkeypatch.setattr(cus_mod.sso_service, "match_user_by_email", _fake_match_email) + monkeypatch.setattr(cus_mod.sso_service, "match_user_by_mobile", _fake_match_mobile) + monkeypatch.setattr( + cus_mod.ChannelUserService, "_find_existing_org_member_for_user", + _fake_find_existing, + ) + + async def _get_none(model, key): + return None + monkeypatch.setattr(fake_session, "get", _get_none, raising=False) + + await channel_user_service.resolve_channel_user( + db=fake_session, + agent=agent, + channel_type="dingtalk", + external_user_id="staff-carol-777", + extra_info={ + "unionid": "UNION-CAROL", + "mobile": "13800000001", + "email": "carol@example.com", + "name": "Carol", + }, + extra_ids=["UNION-CAROL"], + ) + + assert existing_member.external_id == "staff-carol-777" + assert existing_member.unionid == "UNION-CAROL" + + +async def test_enrich_skips_phone_when_other_identity_uses_it(fake_session, monkeypatch): + """Pre-check: if another Identity already has the phone, skip instead of raising.""" + from app.services.channel_user_service import channel_user_service as svc + from app.services import channel_user_service as cus_mod + + current_identity = SimpleNamespace( + id=uuid.uuid4(), phone=None, email=None, + ) + user = SimpleNamespace( + id=uuid.uuid4(), identity_id=current_identity.id, + display_name=None, avatar_url=None, + ) + + async def _fake_get(model, key): + assert key == current_identity.id + return current_identity + monkeypatch.setattr(fake_session, "get", _fake_get, raising=False) + + # Simulate "another identity has this phone": execute returns a truthy row + other_identity_id = uuid.uuid4() + + async def _fake_execute(stmt): + sql = str(stmt) + + class _R: + def scalar_one_or_none(self_inner): + # Return the other identity's id if the query is looking up + # identities by phone; else None. + if "identities.phone" in sql or "phone =" in sql.lower(): + return other_identity_id + return None + return _R() + + monkeypatch.setattr(fake_session, "execute", _fake_execute, raising=False) + + await svc._enrich_user_from_extra_info( + fake_session, user, {"mobile": "15703300627", "email": None, "name": None} + ) + + # Phone was NOT written, no exception raised + assert current_identity.phone is None + + +async def test_enrich_skips_email_when_other_identity_uses_it(fake_session, monkeypatch): + from app.services.channel_user_service import channel_user_service as svc + + current_identity = SimpleNamespace( + id=uuid.uuid4(), phone=None, email=None, + ) + user = SimpleNamespace( + id=uuid.uuid4(), identity_id=current_identity.id, + display_name=None, avatar_url=None, + ) + + async def _fake_get(model, key): + return current_identity + monkeypatch.setattr(fake_session, "get", _fake_get, raising=False) + + async def _fake_execute(stmt): + sql = str(stmt) + + class _R: + def scalar_one_or_none(self_inner): + if "identities.email" in sql or "email =" in sql.lower(): + return uuid.uuid4() + return None + return _R() + monkeypatch.setattr(fake_session, "execute", _fake_execute, raising=False) + + await svc._enrich_user_from_extra_info( + fake_session, user, + {"mobile": None, "email": "dup@example.com", "name": None}, + ) + + assert current_identity.email is None + + +async def test_enrich_writes_phone_when_no_conflict(fake_session, monkeypatch): + """Happy path: no other identity uses the phone → write succeeds.""" + from app.services.channel_user_service import channel_user_service as svc + + current_identity = SimpleNamespace( + id=uuid.uuid4(), phone=None, email=None, + ) + user = SimpleNamespace( + id=uuid.uuid4(), identity_id=current_identity.id, + display_name=None, avatar_url=None, + ) + + async def _fake_get(model, key): + return current_identity + monkeypatch.setattr(fake_session, "get", _fake_get, raising=False) + + async def _fake_execute(stmt): + class _R: + def scalar_one_or_none(self_inner): + return None # no conflict + return _R() + monkeypatch.setattr(fake_session, "execute", _fake_execute, raising=False) + + await svc._enrich_user_from_extra_info( + fake_session, user, {"mobile": "13800000000", "email": None, "name": None} + ) + + assert current_identity.phone == "13800000000" + + +async def test_resolve_continues_when_enrich_raises( + fake_session, agent, patch_provider, monkeypatch +): + """Isolation: even if _enrich raises unexpectedly, resolve still returns the user.""" + from app.services.channel_user_service import channel_user_service as svc + from app.services import channel_user_service as cus_mod + + matched_user = SimpleNamespace( + id=uuid.uuid4(), identity_id=uuid.uuid4(), + display_name=None, avatar_url=None, + ) + + async def _find_linked(self, db, provider_id, channel_type, candidate_ids): + # Return a member already linked to matched_user → Case 1 branch + return SimpleNamespace(id=uuid.uuid4(), user_id=matched_user.id) + + monkeypatch.setattr(cus_mod.ChannelUserService, "_find_org_member", _find_linked) + + async def _db_get(model, key): + if key == matched_user.id: + return matched_user + return None + monkeypatch.setattr(fake_session, "get", _db_get, raising=False) + + async def _boom(self, db, user, extra_info): + raise RuntimeError("simulated enrichment failure") + monkeypatch.setattr( + cus_mod.ChannelUserService, "_enrich_user_from_extra_info", _boom + ) + + # resolve_channel_user should catch the enrichment error and still return the user + result = await svc.resolve_channel_user( + db=fake_session, agent=agent, channel_type="dingtalk", + external_user_id="staff-xyz", + extra_info={"mobile": "13900000000", "email": "x@y.com"}, + ) + assert result.id == matched_user.id + + +async def test_reuse_existing_org_member_triggers_backfill_feishu( + fake_session, agent, patch_provider, monkeypatch +): + """Feishu reuse: extra_info has email + unionid; external_user_id is open_id (ou_...) + → backfill should populate open_id (from prefix), external_id, and unionid (from extra_info) + on the reused OrgMember. + """ + import app.services.channel_user_service as cus_mod + + async def _fake_ensure_feishu(self, db, ptype, tid): + return _fake_provider_for("feishu") + + monkeypatch.setattr(cus_mod.ChannelUserService, "_ensure_provider", _fake_ensure_feishu) + + matched_user = SimpleNamespace( + id=uuid.uuid4(), identity_id=None, + display_name=None, avatar_url=None, + ) + existing_member = _make_member(user_id=matched_user.id) + + async def _fake_find_none(self, db, provider_id, channel_type, candidate_ids): + return None + + async def _fake_match_email(db, email, tenant_id): + return matched_user + + async def _fake_match_mobile(db, mobile, tenant_id): + return None + + async def _fake_find_existing(self, db, user_id, provider_id, tenant_id): + return existing_member + + monkeypatch.setattr(cus_mod.ChannelUserService, "_find_org_member", _fake_find_none) + monkeypatch.setattr(cus_mod.sso_service, "match_user_by_email", _fake_match_email) + monkeypatch.setattr(cus_mod.sso_service, "match_user_by_mobile", _fake_match_mobile) + monkeypatch.setattr( + cus_mod.ChannelUserService, + "_find_existing_org_member_for_user", + _fake_find_existing, + ) + + async def _get_none(model, key): + return None + monkeypatch.setattr(fake_session, "get", _get_none, raising=False) + + await channel_user_service.resolve_channel_user( + db=fake_session, + agent=agent, + channel_type="feishu", + external_user_id="ou_abc123xyz", # Feishu open_id prefix + extra_info={ + "unionid": "UNION-FEISHU-X", + "email": "feishu-user@example.com", + "name": "Feishu User", + }, + ) + + # Backfill assertions — open_id from prefix, external_id from external_user_id, + # unionid from extra_info + assert existing_member.open_id == "ou_abc123xyz" + assert existing_member.external_id == "ou_abc123xyz" + assert existing_member.unionid == "UNION-FEISHU-X" + + +async def test_reuse_existing_org_member_triggers_backfill_wecom( + fake_session, agent, patch_provider, monkeypatch +): + """WeCom reuse: only external_id should be filled (from userid). + unionid in extra_info must NOT be written to the member (wecom doesn't track unionid + on OrgMember). + """ + import app.services.channel_user_service as cus_mod + + async def _fake_ensure_wecom(self, db, ptype, tid): + return _fake_provider_for("wecom") + + monkeypatch.setattr(cus_mod.ChannelUserService, "_ensure_provider", _fake_ensure_wecom) + + matched_user = SimpleNamespace( + id=uuid.uuid4(), identity_id=None, + display_name=None, avatar_url=None, + ) + existing_member = _make_member(user_id=matched_user.id) + + async def _fake_find_none(self, db, provider_id, channel_type, candidate_ids): + return None + + async def _fake_match_email(db, email, tenant_id): + return matched_user + + async def _fake_match_mobile(db, mobile, tenant_id): + return None + + async def _fake_find_existing(self, db, user_id, provider_id, tenant_id): + return existing_member + + monkeypatch.setattr(cus_mod.ChannelUserService, "_find_org_member", _fake_find_none) + monkeypatch.setattr(cus_mod.sso_service, "match_user_by_email", _fake_match_email) + monkeypatch.setattr(cus_mod.sso_service, "match_user_by_mobile", _fake_match_mobile) + monkeypatch.setattr( + cus_mod.ChannelUserService, + "_find_existing_org_member_for_user", + _fake_find_existing, + ) + + async def _get_none(model, key): + return None + monkeypatch.setattr(fake_session, "get", _get_none, raising=False) + + await channel_user_service.resolve_channel_user( + db=fake_session, + agent=agent, + channel_type="wecom", + external_user_id="wecom-userid-42", + extra_info={ + "unionid": "SHOULD-BE-IGNORED", # wecom doesn't write unionid on member + "email": "wecom-user@example.com", + "name": "WeCom User", + }, + ) + + assert existing_member.external_id == "wecom-userid-42" + assert existing_member.unionid is None # not written for wecom + assert existing_member.open_id is None # not written for wecom