From 2641edb3ec030c1a3d5e7367cc862915857045f3 Mon Sep 17 00:00:00 2001 From: Y1fe1Zh0u Date: Tue, 14 Apr 2026 01:07:12 +0800 Subject: [PATCH 1/2] Establish shared Feishu group transcript as the first step toward multi-agent group chat The long-term goal is Feishu-as-channel multi-agent collaboration, where several agents can participate in the same group conversation and react to each other's public replies instead of behaving like isolated one-on-one bots. This commit intentionally implements only the first slice of that design: shared public transcript reading for Feishu group chats. It keeps persistence per-agent, but changes prompt construction so agents in the same Feishu group read a unified public history keyed by external conversation id. It also adds channel-native message ids for deterministic dedupe and persists outbound Feishu message ids where available so transcript replay is more stable. Constraint: Current chat/session model is still pairwise by agent_id and external_conv_id Constraint: Need a reversible first slice before adding routing, throttling, or loop control Rejected: Full channel/member/message schema in the first change | too broad for initial rollout Rejected: Prompt-only coordination without shared transcript | agents would still miss each other's public replies Rejected: Text-and-time-bucket-only dedupe | insufficiently deterministic for repeated webhook fanout Confidence: medium Scope-risk: moderate Reversibility: clean Directive: Treat this as step 1 only; follow-up work still needs mention routing, bot loop filtering, and stronger group scheduling Tested: PYTHONPATH=backend python3 -m unittest backend.tests.test_channel_session_shared_history Tested: PYTHONPYCACHEPREFIX=/tmp/pycache python3.13 -m py_compile backend/app/api/feishu.py backend/app/services/channel_session.py backend/app/models/audit.py backend/alembic/versions/add_chat_message_external_message_id.py backend/tests/test_channel_session_shared_history.py Tested: Simulated shared-group transcript read across two agent sessions in local Docker-backed test DB Not-tested: Real multi-bot Feishu group end-to-end conversation in a live group Not-tested: Mention routing, spam control, and bot loop behavior --- .../add_chat_message_external_message_id.py | 27 + backend/app/api/feishu.py | 112 +++- backend/app/models/audit.py | 1 + backend/app/services/channel_session.py | 127 ++++- .../test_channel_session_shared_history.py | 534 ++++++++++++++++++ 5 files changed, 771 insertions(+), 30 deletions(-) create mode 100644 backend/alembic/versions/add_chat_message_external_message_id.py create mode 100644 backend/tests/test_channel_session_shared_history.py diff --git a/backend/alembic/versions/add_chat_message_external_message_id.py b/backend/alembic/versions/add_chat_message_external_message_id.py new file mode 100644 index 000000000..54b93515b --- /dev/null +++ b/backend/alembic/versions/add_chat_message_external_message_id.py @@ -0,0 +1,27 @@ +"""Add external_message_id to chat_messages for channel-native dedupe. + +Revision ID: add_chat_msg_ext_id +Revises: add_microsoft_teams_support +""" + +from alembic import op + + +revision = "add_chat_msg_ext_id" +down_revision = "add_microsoft_teams_support" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute( + "ALTER TABLE chat_messages ADD COLUMN IF NOT EXISTS external_message_id VARCHAR(255)" + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_chat_messages_external_message_id ON chat_messages(external_message_id)" + ) + + +def downgrade() -> None: + op.execute("DROP INDEX IF EXISTS ix_chat_messages_external_message_id") + op.execute("ALTER TABLE chat_messages DROP COLUMN IF EXISTS external_message_id") diff --git a/backend/app/api/feishu.py b/backend/app/api/feishu.py index aff83664a..5ea9f1341 100644 --- a/backend/app/api/feishu.py +++ b/backend/app/api/feishu.py @@ -67,6 +67,18 @@ async def drain(self) -> None: await self._tail +def _extract_feishu_message_id(response: dict | None) -> str | None: + """Extract message_id from Feishu send-message responses when available.""" + if not isinstance(response, dict): + return None + data = response.get("data") + if isinstance(data, dict): + message_id = data.get("message_id") + if message_id: + return str(message_id) + return None + + # ─── OAuth ────────────────────────────────────────────── from fastapi.responses import HTMLResponse, Response @@ -330,6 +342,7 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict, db: AsyncSession if event_type == "im.message.receive_v1": message = event.get("message", {}) + message_id = message.get("message_id", "") sender = event.get("sender", {}).get("sender_id", {}) sender_open_id = sender.get("open_id", "") sender_user_id_from_event = sender.get("user_id", "") # tenant-stable ID, available directly in event body @@ -439,30 +452,40 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict, db: AsyncSession # Load recent conversation history via session (session UUID may already exist) from app.models.audit import ChatMessage from app.models.agent import Agent as AgentModel - from app.services.channel_session import find_or_create_channel_session + from app.models.chat_session import ChatSession as _ChatSession + from app.services.channel_session import find_or_create_channel_session, load_shared_channel_history agent_r = await db.execute(select(AgentModel).where(AgentModel.id == agent_id)) agent_obj = agent_r.scalar_one_or_none() creator_id = agent_obj.creator_id if agent_obj else agent_id from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE ctx_size = (agent_obj.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE) if agent_obj else DEFAULT_CONTEXT_WINDOW_SIZE - # Pre-resolve session so history lookup uses the UUID (session created later if new) - _pre_sess_r = await db.execute( - select(__import__('app.models.chat_session', fromlist=['ChatSession']).ChatSession).where( - __import__('app.models.chat_session', fromlist=['ChatSession']).ChatSession.agent_id == agent_id, - __import__('app.models.chat_session', fromlist=['ChatSession']).ChatSession.external_conv_id == conv_id, + if chat_type == "group" and chat_id: + history = await load_shared_channel_history( + db, + current_agent_id=agent_id, + external_conv_id=conv_id, + source_channel="feishu", + limit=ctx_size, ) - ) - _pre_sess = _pre_sess_r.scalar_one_or_none() - _history_conv_id = str(_pre_sess.id) if _pre_sess else conv_id - history_result = await db.execute( - select(ChatMessage) - .where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == _history_conv_id) - .order_by(ChatMessage.created_at.desc()) - .limit(ctx_size) - ) - history_msgs = history_result.scalars().all() - history = [{"role": m.role, "content": m.content} for m in reversed(history_msgs)] + else: + # Keep P2P behavior unchanged: use the current agent's own session history. + _pre_sess_r = await db.execute( + select(_ChatSession).where( + _ChatSession.agent_id == agent_id, + _ChatSession.external_conv_id == conv_id, + ) + ) + _pre_sess = _pre_sess_r.scalar_one_or_none() + _history_conv_id = str(_pre_sess.id) if _pre_sess else conv_id + history_result = await db.execute( + select(ChatMessage) + .where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == _history_conv_id) + .order_by(ChatMessage.created_at.desc()) + .limit(ctx_size) + ) + history_msgs = history_result.scalars().all() + history = [{"role": m.role, "content": m.content} for m in reversed(history_msgs)] # --- Resolve Feishu sender identity & find/create platform user --- import uuid as _uuid @@ -577,7 +600,16 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict, db: AsyncSession session_conv_id = str(_sess.id) # Save user message - db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user", content=user_text, conversation_id=session_conv_id)) + db.add( + ChatMessage( + agent_id=agent_id, + user_id=platform_user_id, + role="user", + content=user_text, + conversation_id=session_conv_id, + external_message_id=message_id or None, + ) + ) _sess.last_message_at = _dt.now(_tz.utc) await db.commit() @@ -1066,7 +1098,18 @@ async def _heartbeat(): logger.error(f"[Feishu] Failed to create task: {e}") # Save assistant reply to history (use platform_user_id so messages stay in one session) - db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="assistant", content=reply_text, conversation_id=session_conv_id)) + db.add( + ChatMessage( + agent_id=agent_id, + user_id=platform_user_id, + role="assistant", + content=reply_text, + conversation_id=session_conv_id, + # CardKit flow does not currently expose a stable message_id here. + # IM patch fallback does, so persist it when available. + external_message_id=msg_id_for_patch or None, + ) + ) _sess.last_message_at = _dt.now(_tz.utc) await db.commit() @@ -1233,9 +1276,16 @@ async def _handle_feishu_file(db, agent_id, config, message, sender_open_id, cha user_msg_content = f"[用户发送了图片]\n{_image_marker}" else: user_msg_content = f"[file:{filename}]" - db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user", - content=user_msg_content if msg_type != "image" else f"[file:{filename}]", - conversation_id=session_conv_id)) + db.add( + ChatMessage( + agent_id=agent_id, + user_id=platform_user_id, + role="user", + content=user_msg_content if msg_type != "image" else f"[file:{filename}]", + conversation_id=session_conv_id, + external_message_id=message_id or None, + ) + ) _sess.last_message_at = _dt.now(_tz.utc) # Load conversation history for LLM context @@ -1399,24 +1449,34 @@ async def _img_heartbeat(): await asyncio.sleep(random.uniform(1.0, 2.0)) ack = random.choice(_FILE_ACK_MESSAGES) + ack_message_id = None try: if chat_type == "group" and chat_id: - await feishu_service.send_message( + _ack_resp = await feishu_service.send_message( config.app_id, config.app_secret, chat_id, "text", json.dumps({"text": ack}), receive_id_type="chat_id", ) else: - await feishu_service.send_message( + _ack_resp = await feishu_service.send_message( config.app_id, config.app_secret, sender_open_id, "text", json.dumps({"text": ack}), ) + ack_message_id = _extract_feishu_message_id(_ack_resp) except Exception as e: logger.error(f"[Feishu] Failed to send ack: {e}") # Store ack in DB async with _async_session() as db2: - db2.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="assistant", - content=ack, conversation_id=session_conv_id)) + db2.add( + ChatMessage( + agent_id=agent_id, + user_id=platform_user_id, + role="assistant", + content=ack, + conversation_id=session_conv_id, + external_message_id=ack_message_id, + ) + ) await db2.commit() diff --git a/backend/app/models/audit.py b/backend/app/models/audit.py index 52db3563d..de3899faa 100644 --- a/backend/app/models/audit.py +++ b/backend/app/models/audit.py @@ -57,6 +57,7 @@ class ChatMessage(Base): ) content: Mapped[str] = mapped_column(Text, nullable=False) conversation_id: Mapped[str] = mapped_column(String(200), default="web", nullable=False) + external_message_id: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True) # Participant identity (unified User/Agent identity) participant_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("participants.id"), nullable=True) # Model thinking process diff --git a/backend/app/services/channel_session.py b/backend/app/services/channel_session.py index 5b6b7de40..ef5dd8c43 100644 --- a/backend/app/services/channel_session.py +++ b/backend/app/services/channel_session.py @@ -1,7 +1,4 @@ -"""Shared helper: find-or-create ChatSession by external channel conv_id. - -Used by feishu.py, slack.py, discord_bot.py, wecom.py, teams.py — eliminates in-process caches. -""" +"""Shared helpers for external channel sessions and shared public transcript reads.""" import uuid as _uuid from datetime import datetime, timezone @@ -65,3 +62,125 @@ async def find_or_create_channel_session( session.title = group_name[:40] return session + + +def _normalize_shared_channel_history( + messages: list, + current_agent_id: _uuid.UUID, + user_names: dict[_uuid.UUID, str], + agent_names: dict[_uuid.UUID, str], + limit: int, +) -> list[dict]: + """Normalize raw group messages into prompt-safe shared transcript entries. + + - Human messages remain `user` messages with explicit speaker labels. + - The current agent's prior replies remain `assistant` messages. + - Other agents' prior replies are converted into visible public transcript + lines as `user` messages so they are not mistaken for the current + assistant's own prior output. + - Duplicate public human messages from multiple per-agent sessions are + deduplicated within a short time bucket. + """ + seen: set[tuple] = set() + external_message_positions: dict[str, int] = {} + normalized: list[dict] = [] + + for message in messages: + if message.role not in ("user", "assistant"): + continue + + if message.role == "assistant": + speaker_name = agent_names.get(message.agent_id, "未知智能体") + if message.agent_id == current_agent_id: + entry = {"role": "assistant", "content": message.content} + else: + entry = {"role": "user", "content": f"[其他智能体 {speaker_name}] {message.content}"} + dedupe_speaker = ("assistant", message.agent_id) + entry_priority = 2 + else: + speaker_name = user_names.get(message.user_id, "群成员") + entry = {"role": "user", "content": f"[群成员 {speaker_name}] {message.content}"} + dedupe_speaker = ("user", message.user_id) + entry_priority = 1 + + external_message_id = getattr(message, "external_message_id", None) + created_at = getattr(message, "created_at", None) + bucket = int(created_at.timestamp() // 10) if created_at else None + if external_message_id: + existing_pos = external_message_positions.get(external_message_id) + if existing_pos is None: + external_message_positions[external_message_id] = len(normalized) + normalized.append({**entry, "_priority": entry_priority}) + else: + if entry_priority > normalized[existing_pos].get("_priority", 0): + normalized[existing_pos] = {**entry, "_priority": entry_priority} + continue + else: + dedupe_key = (dedupe_speaker, entry["content"], bucket) + if dedupe_key in seen: + continue + seen.add(dedupe_key) + normalized.append({**entry, "_priority": entry_priority}) + + if len(normalized) > limit: + normalized = normalized[-limit:] + return [{"role": entry["role"], "content": entry["content"]} for entry in normalized] + + +async def load_shared_channel_history( + db: AsyncSession, + *, + current_agent_id: _uuid.UUID, + external_conv_id: str, + source_channel: str, + limit: int = 100, +) -> list[dict]: + """Load shared public history across all sessions of the same external chat. + + This is intentionally scoped to public channel context. Persistence remains + per-agent, but prompt construction for shared channels can read a unified + transcript keyed by `(source_channel, external_conv_id)`. + """ + from app.models.agent import Agent + from app.models.audit import ChatMessage + from app.models.user import User + + sessions_result = await db.execute( + select(ChatSession.id).where( + ChatSession.external_conv_id == external_conv_id, + ChatSession.source_channel == source_channel, + ) + ) + session_ids = [str(row[0]) for row in sessions_result.fetchall()] + if not session_ids: + return [] + + raw_limit = max(limit * 4, limit) + messages_result = await db.execute( + select(ChatMessage) + .where(ChatMessage.conversation_id.in_(session_ids)) + .order_by(ChatMessage.created_at.desc()) + .limit(raw_limit) + ) + messages = list(reversed(messages_result.scalars().all())) + if not messages: + return [] + + user_ids = {m.user_id for m in messages if getattr(m, "user_id", None)} + agent_ids = {m.agent_id for m in messages if getattr(m, "agent_id", None)} + + user_names: dict[_uuid.UUID, str] = {} + if user_ids: + user_result = await db.execute( + select(User.id, User.display_name, User.username).where(User.id.in_(user_ids)) + ) + for user_id, display_name, username in user_result.fetchall(): + user_names[user_id] = display_name or username or "群成员" + + agent_names: dict[_uuid.UUID, str] = {} + if agent_ids: + agent_result = await db.execute(select(Agent.id, Agent.name).where(Agent.id.in_(agent_ids))) + for agent_id, name in agent_result.fetchall(): + agent_names[agent_id] = name or "未知智能体" + + return _normalize_shared_channel_history(messages, current_agent_id, user_names, agent_names, limit) diff --git a/backend/tests/test_channel_session_shared_history.py b/backend/tests/test_channel_session_shared_history.py new file mode 100644 index 000000000..382efdf38 --- /dev/null +++ b/backend/tests/test_channel_session_shared_history.py @@ -0,0 +1,534 @@ +import sys +import types +import unittest +import uuid +from collections.abc import Iterable +from datetime import datetime, timedelta, timezone +from types import SimpleNamespace + + +class _FakeColumn: + def __init__(self, name: str): + self.name = name + + def in_(self, values: Iterable): + return ("in", self.name, tuple(values)) + + def __eq__(self, other): + return ("eq", self.name, other) + + def desc(self): + return ("desc", self.name) + + +class _FakeQuery: + def __init__(self, *items): + self.items = items + self.steps = [] + + def where(self, *conditions): + self.steps.append(("where", conditions)) + return self + + def order_by(self, *clauses): + self.steps.append(("order_by", clauses)) + return self + + def limit(self, value): + self.steps.append(("limit", value)) + return self + + +if "sqlalchemy" not in sys.modules: + fake_sqlalchemy = types.ModuleType("sqlalchemy") + + def _fake_select(*args, **kwargs): + return _FakeQuery(*args) + + fake_sqlalchemy.select = _fake_select + sys.modules["sqlalchemy"] = fake_sqlalchemy + +if "sqlalchemy.ext" not in sys.modules: + sys.modules["sqlalchemy.ext"] = types.ModuleType("sqlalchemy.ext") + +if "sqlalchemy.ext.asyncio" not in sys.modules: + fake_asyncio = types.ModuleType("sqlalchemy.ext.asyncio") + + class _FakeAsyncSession: + pass + + fake_asyncio.AsyncSession = _FakeAsyncSession + sys.modules["sqlalchemy.ext.asyncio"] = fake_asyncio + +if "app.models.chat_session" not in sys.modules: + fake_chat_session = types.ModuleType("app.models.chat_session") + + class _FakeChatSession: + id = _FakeColumn("id") + external_conv_id = _FakeColumn("external_conv_id") + source_channel = _FakeColumn("source_channel") + + fake_chat_session.ChatSession = _FakeChatSession + sys.modules["app.models.chat_session"] = fake_chat_session + +if "app.models.user" not in sys.modules: + fake_user = types.ModuleType("app.models.user") + + class _FakeUser: + id = _FakeColumn("id") + display_name = _FakeColumn("display_name") + username = _FakeColumn("username") + + fake_user.User = _FakeUser + sys.modules["app.models.user"] = fake_user + +if "app.models.agent" not in sys.modules: + fake_agent = types.ModuleType("app.models.agent") + + class _FakeAgent: + id = _FakeColumn("id") + name = _FakeColumn("name") + + fake_agent.Agent = _FakeAgent + sys.modules["app.models.agent"] = fake_agent + +if "app.models.audit" not in sys.modules: + fake_audit = types.ModuleType("app.models.audit") + + class _FakeChatMessage: + conversation_id = _FakeColumn("conversation_id") + created_at = _FakeColumn("created_at") + user_id = _FakeColumn("user_id") + agent_id = _FakeColumn("agent_id") + + fake_audit.ChatMessage = _FakeChatMessage + sys.modules["app.models.audit"] = fake_audit + + +from app.services.channel_session import _normalize_shared_channel_history, load_shared_channel_history + + +class _FakeScalarResult: + def __init__(self, values): + self._values = list(values) + + def all(self): + return list(self._values) + + +class _FakeResult: + def __init__(self, *, fetchall_values=None, scalar_values=None): + self._fetchall_values = list(fetchall_values or []) + self._scalar_values = list(scalar_values or []) + + def fetchall(self): + return list(self._fetchall_values) + + def scalars(self): + return _FakeScalarResult(self._scalar_values) + + +class _FakeAsyncSession: + def __init__(self, results): + self._results = list(results) + + async def execute(self, _query): + if not self._results: + raise AssertionError("Unexpected extra DB execute() call") + return self._results.pop(0) + + +class SharedChannelHistoryTests(unittest.TestCase): + def test_empty_history_returns_empty_list(self): + result = _normalize_shared_channel_history( + [], + current_agent_id=uuid.uuid4(), + user_names={}, + agent_names={}, + limit=20, + ) + + self.assertEqual(result, []) + + def test_other_agents_are_injected_as_public_transcript(self): + current_agent_id = uuid.uuid4() + other_agent_id = uuid.uuid4() + user_id = uuid.uuid4() + now = datetime.now(timezone.utc) + + messages = [ + SimpleNamespace( + role="user", + user_id=user_id, + agent_id=current_agent_id, + content="大家帮我看一下这个 PR", + created_at=now, + external_message_id="om_user_1", + ), + SimpleNamespace( + role="assistant", + user_id=user_id, + agent_id=other_agent_id, + content="我先看 migration 风险。", + created_at=now + timedelta(seconds=1), + external_message_id=None, + ), + SimpleNamespace( + role="assistant", + user_id=user_id, + agent_id=current_agent_id, + content="我来检查 API 改动。", + created_at=now + timedelta(seconds=2), + external_message_id=None, + ), + ] + + result = _normalize_shared_channel_history( + messages, + current_agent_id=current_agent_id, + user_names={user_id: "Yifei"}, + agent_names={current_agent_id: "Dev-Agent", other_agent_id: "Review-Agent"}, + limit=20, + ) + + self.assertEqual( + result, + [ + {"role": "user", "content": "[群成员 Yifei] 大家帮我看一下这个 PR"}, + {"role": "user", "content": "[其他智能体 Review-Agent] 我先看 migration 风险。"}, + {"role": "assistant", "content": "我来检查 API 改动。"}, + ], + ) + + def test_duplicate_human_messages_with_same_external_message_id_are_deduped(self): + current_agent_id = uuid.uuid4() + user_id = uuid.uuid4() + now = datetime.now(timezone.utc) + + messages = [ + SimpleNamespace( + role="user", + user_id=user_id, + agent_id=uuid.uuid4(), + content="同一条群消息", + created_at=now, + external_message_id="om_shared_1", + ), + SimpleNamespace( + role="user", + user_id=user_id, + agent_id=uuid.uuid4(), + content="同一条群消息", + created_at=now + timedelta(seconds=2), + external_message_id="om_shared_1", + ), + ] + + result = _normalize_shared_channel_history( + messages, + current_agent_id=current_agent_id, + user_names={user_id: "Alice"}, + agent_names={}, + limit=20, + ) + + self.assertEqual(result, [{"role": "user", "content": "[群成员 Alice] 同一条群消息"}]) + + def test_text_bucket_fallback_still_dedupes_when_external_message_id_is_missing(self): + current_agent_id = uuid.uuid4() + user_id = uuid.uuid4() + now = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + + messages = [ + SimpleNamespace( + role="user", + user_id=user_id, + agent_id=uuid.uuid4(), + content="没有 message id 的旧消息", + created_at=now, + external_message_id=None, + ), + SimpleNamespace( + role="user", + user_id=user_id, + agent_id=uuid.uuid4(), + content="没有 message id 的旧消息", + created_at=now + timedelta(seconds=4), + external_message_id=None, + ), + ] + + result = _normalize_shared_channel_history( + messages, + current_agent_id=current_agent_id, + user_names={user_id: "Alice"}, + agent_names={}, + limit=20, + ) + + self.assertEqual(result, [{"role": "user", "content": "[群成员 Alice] 没有 message id 的旧消息"}]) + + def test_non_chat_roles_are_skipped(self): + current_agent_id = uuid.uuid4() + user_id = uuid.uuid4() + now = datetime.now(timezone.utc) + + messages = [ + SimpleNamespace( + role="tool_call", + user_id=user_id, + agent_id=current_agent_id, + content='{"name":"search"}', + created_at=now, + external_message_id=None, + ), + SimpleNamespace( + role="system", + user_id=user_id, + agent_id=current_agent_id, + content="internal note", + created_at=now + timedelta(seconds=1), + external_message_id=None, + ), + ] + + result = _normalize_shared_channel_history( + messages, + current_agent_id=current_agent_id, + user_names={user_id: "Alice"}, + agent_names={}, + limit=20, + ) + + self.assertEqual(result, []) + + def test_assistant_message_wins_when_same_external_message_id_is_seen_twice(self): + current_agent_id = uuid.uuid4() + other_agent_id = uuid.uuid4() + bot_user_id = uuid.uuid4() + now = datetime.now(timezone.utc) + + messages = [ + SimpleNamespace( + role="assistant", + user_id=bot_user_id, + agent_id=other_agent_id, + content="我来检查 migration。", + created_at=now, + external_message_id="om_bot_1", + ), + SimpleNamespace( + role="user", + user_id=bot_user_id, + agent_id=current_agent_id, + content="我来检查 migration。", + created_at=now + timedelta(seconds=1), + external_message_id="om_bot_1", + ), + ] + + result = _normalize_shared_channel_history( + messages, + current_agent_id=current_agent_id, + user_names={bot_user_id: "BotUser"}, + agent_names={other_agent_id: "Review-Agent"}, + limit=20, + ) + + self.assertEqual(result, [{"role": "user", "content": "[其他智能体 Review-Agent] 我来检查 migration。"}]) + + def test_missing_names_fall_back_to_generic_labels(self): + current_agent_id = uuid.uuid4() + other_agent_id = uuid.uuid4() + unknown_user_id = uuid.uuid4() + now = datetime.now(timezone.utc) + + messages = [ + SimpleNamespace( + role="user", + user_id=unknown_user_id, + agent_id=current_agent_id, + content="有人在吗", + created_at=now, + external_message_id="om_unknown_user_1", + ), + SimpleNamespace( + role="assistant", + user_id=unknown_user_id, + agent_id=other_agent_id, + content="我在。", + created_at=now + timedelta(seconds=1), + external_message_id=None, + ), + ] + + result = _normalize_shared_channel_history( + messages, + current_agent_id=current_agent_id, + user_names={}, + agent_names={}, + limit=20, + ) + + self.assertEqual( + result, + [ + {"role": "user", "content": "[群成员 群成员] 有人在吗"}, + {"role": "user", "content": "[其他智能体 未知智能体] 我在。"}, + ], + ) + + def test_limit_keeps_latest_entries_after_normalization(self): + current_agent_id = uuid.uuid4() + user_id = uuid.uuid4() + now = datetime.now(timezone.utc) + + messages = [ + SimpleNamespace( + role="user", + user_id=user_id, + agent_id=current_agent_id, + content="第一条", + created_at=now, + external_message_id="m1", + ), + SimpleNamespace( + role="user", + user_id=user_id, + agent_id=current_agent_id, + content="第二条", + created_at=now + timedelta(seconds=1), + external_message_id="m2", + ), + SimpleNamespace( + role="assistant", + user_id=user_id, + agent_id=current_agent_id, + content="第三条", + created_at=now + timedelta(seconds=2), + external_message_id="m3", + ), + ] + + result = _normalize_shared_channel_history( + messages, + current_agent_id=current_agent_id, + user_names={user_id: "Alice"}, + agent_names={current_agent_id: "Dev-Agent"}, + limit=2, + ) + + self.assertEqual( + result, + [ + {"role": "user", "content": "[群成员 Alice] 第二条"}, + {"role": "assistant", "content": "第三条"}, + ], + ) + + +class SharedChannelHistoryLoaderTests(unittest.IsolatedAsyncioTestCase): + async def test_loader_returns_empty_when_no_sessions_match(self): + db = _FakeAsyncSession([ + _FakeResult(fetchall_values=[]), + ]) + + result = await load_shared_channel_history( + db, + current_agent_id=uuid.uuid4(), + external_conv_id="feishu_group_missing", + source_channel="feishu", + limit=20, + ) + + self.assertEqual(result, []) + + async def test_loader_returns_empty_when_sessions_exist_but_no_messages(self): + db = _FakeAsyncSession([ + _FakeResult(fetchall_values=[(uuid.uuid4(),), (uuid.uuid4(),)]), + _FakeResult(scalar_values=[]), + ]) + + result = await load_shared_channel_history( + db, + current_agent_id=uuid.uuid4(), + external_conv_id="feishu_group_empty", + source_channel="feishu", + limit=20, + ) + + self.assertEqual(result, []) + + async def test_loader_merges_cross_agent_history_and_prefers_assistant_variant(self): + current_agent_id = uuid.uuid4() + other_agent_id = uuid.uuid4() + user_id = uuid.uuid4() + now = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + + # Match the real DB query shape: ordered by created_at DESC before the + # loader reverses back to chronological order. + messages = [ + SimpleNamespace( + role="user", + user_id=user_id, + agent_id=current_agent_id, + content="我先看 migration 风险。", + conversation_id="sess-a", + external_message_id="ext-bot-1", + created_at=now + timedelta(seconds=3), + ), + SimpleNamespace( + role="assistant", + user_id=user_id, + agent_id=other_agent_id, + content="我先看 migration 风险。", + conversation_id="sess-b", + external_message_id="ext-bot-1", + created_at=now + timedelta(seconds=2), + ), + SimpleNamespace( + role="user", + user_id=user_id, + agent_id=other_agent_id, + content="大家看看这个 PR", + conversation_id="sess-b", + external_message_id="ext-user-1", + created_at=now + timedelta(seconds=1), + ), + SimpleNamespace( + role="user", + user_id=user_id, + agent_id=current_agent_id, + content="大家看看这个 PR", + conversation_id="sess-a", + external_message_id="ext-user-1", + created_at=now, + ), + ] + + db = _FakeAsyncSession([ + _FakeResult(fetchall_values=[("sess-a",), ("sess-b",)]), + _FakeResult(scalar_values=messages), + _FakeResult(fetchall_values=[(user_id, "Admin", "admin")]), + _FakeResult(fetchall_values=[(current_agent_id, "Morty"), (other_agent_id, "Meeseeks")]), + ]) + + result = await load_shared_channel_history( + db, + current_agent_id=current_agent_id, + external_conv_id="feishu_group_demo", + source_channel="feishu", + limit=20, + ) + + self.assertEqual( + result, + [ + {"role": "user", "content": "[群成员 Admin] 大家看看这个 PR"}, + {"role": "user", "content": "[其他智能体 Meeseeks] 我先看 migration 风险。"}, + ], + ) + + +if __name__ == "__main__": + unittest.main() From 5e197215c02f0a292b9fcb77b539be16afd3734f Mon Sep 17 00:00:00 2001 From: Y1fe1Zh0u Date: Tue, 14 Apr 2026 19:56:35 +0800 Subject: [PATCH 2/2] Tighten transcript isolation for Feishu group history Scope shared Feishu shared-transcript reads to the current tenant and to group sessions only, so public transcript aggregation cannot mix unrelated tenant or session contexts. Rebase the external_message_id migration onto the current migration chain head to avoid creating a parallel Alembic head. Constraint: Shared transcript is tenant-bound even when external conversation ids look globally stable Rejected: Rely on source_channel + external_conv_id alone | risks cross-tenant prompt contamination Rejected: Keep migration on add_microsoft_teams_support | would branch the migration graph on current main Confidence: high Scope-risk: narrow Reversibility: clean Directive: Future shared-channel readers should always carry tenant scope explicitly and stay limited to true group sessions Tested: PYTHONPATH=backend python3.13 -m unittest backend.tests.test_channel_session_shared_history Tested: PYTHONPYCACHEPREFIX=/tmp/pycache python3.13 -m py_compile backend/app/api/feishu.py backend/app/services/channel_session.py backend/alembic/versions/add_chat_message_external_message_id.py backend/tests/test_channel_session_shared_history.py Not-tested: Real multi-tenant Feishu deployment with overlapping external conversation ids --- .../add_chat_message_external_message_id.py | 2 +- backend/app/api/feishu.py | 1 + backend/app/services/channel_session.py | 10 +++++++- .../test_channel_session_shared_history.py | 24 +++++++++++++++++++ 4 files changed, 35 insertions(+), 2 deletions(-) diff --git a/backend/alembic/versions/add_chat_message_external_message_id.py b/backend/alembic/versions/add_chat_message_external_message_id.py index 54b93515b..37381c4e2 100644 --- a/backend/alembic/versions/add_chat_message_external_message_id.py +++ b/backend/alembic/versions/add_chat_message_external_message_id.py @@ -8,7 +8,7 @@ revision = "add_chat_msg_ext_id" -down_revision = "add_microsoft_teams_support" +down_revision = "20260313_column_modify" branch_labels = None depends_on = None diff --git a/backend/app/api/feishu.py b/backend/app/api/feishu.py index 5ea9f1341..71949a459 100644 --- a/backend/app/api/feishu.py +++ b/backend/app/api/feishu.py @@ -464,6 +464,7 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict, db: AsyncSession history = await load_shared_channel_history( db, current_agent_id=agent_id, + current_tenant_id=agent_obj.tenant_id if agent_obj else None, external_conv_id=conv_id, source_channel="feishu", limit=ctx_size, diff --git a/backend/app/services/channel_session.py b/backend/app/services/channel_session.py index ef5dd8c43..3683e14d4 100644 --- a/backend/app/services/channel_session.py +++ b/backend/app/services/channel_session.py @@ -131,6 +131,7 @@ async def load_shared_channel_history( db: AsyncSession, *, current_agent_id: _uuid.UUID, + current_tenant_id: _uuid.UUID | None, external_conv_id: str, source_channel: str, limit: int = 100, @@ -145,10 +146,17 @@ async def load_shared_channel_history( from app.models.audit import ChatMessage from app.models.user import User + if not current_tenant_id: + return [] + sessions_result = await db.execute( - select(ChatSession.id).where( + select(ChatSession.id) + .join(Agent, Agent.id == ChatSession.agent_id) + .where( ChatSession.external_conv_id == external_conv_id, ChatSession.source_channel == source_channel, + ChatSession.is_group == True, + Agent.tenant_id == current_tenant_id, ) ) session_ids = [str(row[0]) for row in sessions_result.fetchall()] diff --git a/backend/tests/test_channel_session_shared_history.py b/backend/tests/test_channel_session_shared_history.py index 382efdf38..35d213f5a 100644 --- a/backend/tests/test_channel_session_shared_history.py +++ b/backend/tests/test_channel_session_shared_history.py @@ -26,6 +26,10 @@ def __init__(self, *items): self.items = items self.steps = [] + def join(self, *args): + self.steps.append(("join", args)) + return self + def where(self, *conditions): self.steps.append(("where", conditions)) return self @@ -67,6 +71,8 @@ class _FakeChatSession: id = _FakeColumn("id") external_conv_id = _FakeColumn("external_conv_id") source_channel = _FakeColumn("source_channel") + agent_id = _FakeColumn("agent_id") + is_group = _FakeColumn("is_group") fake_chat_session.ChatSession = _FakeChatSession sys.modules["app.models.chat_session"] = fake_chat_session @@ -88,6 +94,7 @@ class _FakeUser: class _FakeAgent: id = _FakeColumn("id") name = _FakeColumn("name") + tenant_id = _FakeColumn("tenant_id") fake_agent.Agent = _FakeAgent sys.modules["app.models.agent"] = fake_agent @@ -436,6 +443,7 @@ async def test_loader_returns_empty_when_no_sessions_match(self): result = await load_shared_channel_history( db, current_agent_id=uuid.uuid4(), + current_tenant_id=uuid.uuid4(), external_conv_id="feishu_group_missing", source_channel="feishu", limit=20, @@ -452,6 +460,7 @@ async def test_loader_returns_empty_when_sessions_exist_but_no_messages(self): result = await load_shared_channel_history( db, current_agent_id=uuid.uuid4(), + current_tenant_id=uuid.uuid4(), external_conv_id="feishu_group_empty", source_channel="feishu", limit=20, @@ -516,6 +525,7 @@ async def test_loader_merges_cross_agent_history_and_prefers_assistant_variant(s result = await load_shared_channel_history( db, current_agent_id=current_agent_id, + current_tenant_id=uuid.uuid4(), external_conv_id="feishu_group_demo", source_channel="feishu", limit=20, @@ -529,6 +539,20 @@ async def test_loader_merges_cross_agent_history_and_prefers_assistant_variant(s ], ) + async def test_loader_returns_empty_when_tenant_scope_is_missing(self): + db = _FakeAsyncSession([]) + + result = await load_shared_channel_history( + db, + current_agent_id=uuid.uuid4(), + current_tenant_id=None, + external_conv_id="feishu_group_demo", + source_channel="feishu", + limit=20, + ) + + self.assertEqual(result, []) + if __name__ == "__main__": unittest.main()