-
Notifications
You must be signed in to change notification settings - Fork 506
Establish shared Feishu group transcript as the first step toward multi-agent group chat #399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| """Add external_message_id to chat_messages for channel-native dedupe. | ||
|
|
||
| Revision ID: add_chat_msg_ext_id | ||
| Revises: add_microsoft_teams_support | ||
| """ | ||
|
|
||
| from alembic import op | ||
|
|
||
|
|
||
| revision = "add_chat_msg_ext_id" | ||
| down_revision = "20260313_column_modify" | ||
| branch_labels = None | ||
| depends_on = None | ||
|
|
||
|
|
||
| def upgrade() -> None: | ||
| op.execute( | ||
| "ALTER TABLE chat_messages ADD COLUMN IF NOT EXISTS external_message_id VARCHAR(255)" | ||
| ) | ||
| op.execute( | ||
| "CREATE INDEX IF NOT EXISTS ix_chat_messages_external_message_id ON chat_messages(external_message_id)" | ||
| ) | ||
|
|
||
|
|
||
| def downgrade() -> None: | ||
| op.execute("DROP INDEX IF EXISTS ix_chat_messages_external_message_id") | ||
| op.execute("ALTER TABLE chat_messages DROP COLUMN IF EXISTS external_message_id") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,6 +67,18 @@ async def drain(self) -> None: | |
| await self._tail | ||
|
|
||
|
|
||
| def _extract_feishu_message_id(response: dict | None) -> str | None: | ||
| """Extract message_id from Feishu send-message responses when available.""" | ||
| if not isinstance(response, dict): | ||
| return None | ||
| data = response.get("data") | ||
| if isinstance(data, dict): | ||
| message_id = data.get("message_id") | ||
| if message_id: | ||
| return str(message_id) | ||
| return None | ||
|
|
||
|
|
||
| # ─── OAuth ────────────────────────────────────────────── | ||
|
|
||
| from fastapi.responses import HTMLResponse, Response | ||
|
|
@@ -330,6 +342,7 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict, db: AsyncSession | |
|
|
||
| if event_type == "im.message.receive_v1": | ||
| message = event.get("message", {}) | ||
| message_id = message.get("message_id", "") | ||
| sender = event.get("sender", {}).get("sender_id", {}) | ||
| sender_open_id = sender.get("open_id", "") | ||
| sender_user_id_from_event = sender.get("user_id", "") # tenant-stable ID, available directly in event body | ||
|
|
@@ -439,30 +452,41 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict, db: AsyncSession | |
| # Load recent conversation history via session (session UUID may already exist) | ||
| from app.models.audit import ChatMessage | ||
| from app.models.agent import Agent as AgentModel | ||
| from app.services.channel_session import find_or_create_channel_session | ||
| from app.models.chat_session import ChatSession as _ChatSession | ||
| from app.services.channel_session import find_or_create_channel_session, load_shared_channel_history | ||
| agent_r = await db.execute(select(AgentModel).where(AgentModel.id == agent_id)) | ||
| agent_obj = agent_r.scalar_one_or_none() | ||
| creator_id = agent_obj.creator_id if agent_obj else agent_id | ||
| from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE | ||
| ctx_size = (agent_obj.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE) if agent_obj else DEFAULT_CONTEXT_WINDOW_SIZE | ||
|
|
||
| # Pre-resolve session so history lookup uses the UUID (session created later if new) | ||
| _pre_sess_r = await db.execute( | ||
| select(__import__('app.models.chat_session', fromlist=['ChatSession']).ChatSession).where( | ||
| __import__('app.models.chat_session', fromlist=['ChatSession']).ChatSession.agent_id == agent_id, | ||
| __import__('app.models.chat_session', fromlist=['ChatSession']).ChatSession.external_conv_id == conv_id, | ||
| if chat_type == "group" and chat_id: | ||
| history = await load_shared_channel_history( | ||
| db, | ||
| current_agent_id=agent_id, | ||
| current_tenant_id=agent_obj.tenant_id if agent_obj else None, | ||
| external_conv_id=conv_id, | ||
| source_channel="feishu", | ||
| limit=ctx_size, | ||
| ) | ||
|
Comment on lines
+464
to
471
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This shared-history fetch runs before the current inbound message is persisted for this agent, so in multi-agent groups a slower agent can read the same Feishu event that another agent already wrote and then send that same text again as Useful? React with 👍 / 👎. |
||
| ) | ||
| _pre_sess = _pre_sess_r.scalar_one_or_none() | ||
| _history_conv_id = str(_pre_sess.id) if _pre_sess else conv_id | ||
| history_result = await db.execute( | ||
| select(ChatMessage) | ||
| .where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == _history_conv_id) | ||
| .order_by(ChatMessage.created_at.desc()) | ||
| .limit(ctx_size) | ||
| ) | ||
| history_msgs = history_result.scalars().all() | ||
| history = [{"role": m.role, "content": m.content} for m in reversed(history_msgs)] | ||
| else: | ||
| # Keep P2P behavior unchanged: use the current agent's own session history. | ||
| _pre_sess_r = await db.execute( | ||
| select(_ChatSession).where( | ||
| _ChatSession.agent_id == agent_id, | ||
| _ChatSession.external_conv_id == conv_id, | ||
| ) | ||
| ) | ||
| _pre_sess = _pre_sess_r.scalar_one_or_none() | ||
| _history_conv_id = str(_pre_sess.id) if _pre_sess else conv_id | ||
| history_result = await db.execute( | ||
| select(ChatMessage) | ||
| .where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == _history_conv_id) | ||
| .order_by(ChatMessage.created_at.desc()) | ||
| .limit(ctx_size) | ||
| ) | ||
| history_msgs = history_result.scalars().all() | ||
| history = [{"role": m.role, "content": m.content} for m in reversed(history_msgs)] | ||
|
|
||
| # --- Resolve Feishu sender identity & find/create platform user --- | ||
| import uuid as _uuid | ||
|
|
@@ -577,7 +601,16 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict, db: AsyncSession | |
| session_conv_id = str(_sess.id) | ||
|
|
||
| # Save user message | ||
| db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user", content=user_text, conversation_id=session_conv_id)) | ||
| db.add( | ||
| ChatMessage( | ||
| agent_id=agent_id, | ||
| user_id=platform_user_id, | ||
| role="user", | ||
| content=user_text, | ||
| conversation_id=session_conv_id, | ||
| external_message_id=message_id or None, | ||
| ) | ||
| ) | ||
| _sess.last_message_at = _dt.now(_tz.utc) | ||
| await db.commit() | ||
|
|
||
|
|
@@ -1066,7 +1099,18 @@ async def _heartbeat(): | |
| logger.error(f"[Feishu] Failed to create task: {e}") | ||
|
|
||
| # Save assistant reply to history (use platform_user_id so messages stay in one session) | ||
| db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="assistant", content=reply_text, conversation_id=session_conv_id)) | ||
| db.add( | ||
| ChatMessage( | ||
| agent_id=agent_id, | ||
| user_id=platform_user_id, | ||
| role="assistant", | ||
| content=reply_text, | ||
| conversation_id=session_conv_id, | ||
| # CardKit flow does not currently expose a stable message_id here. | ||
| # IM patch fallback does, so persist it when available. | ||
| external_message_id=msg_id_for_patch or None, | ||
| ) | ||
| ) | ||
| _sess.last_message_at = _dt.now(_tz.utc) | ||
| await db.commit() | ||
|
|
||
|
|
@@ -1233,9 +1277,16 @@ async def _handle_feishu_file(db, agent_id, config, message, sender_open_id, cha | |
| user_msg_content = f"[用户发送了图片]\n{_image_marker}" | ||
| else: | ||
| user_msg_content = f"[file:{filename}]" | ||
| db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user", | ||
| content=user_msg_content if msg_type != "image" else f"[file:{filename}]", | ||
| conversation_id=session_conv_id)) | ||
| db.add( | ||
| ChatMessage( | ||
| agent_id=agent_id, | ||
| user_id=platform_user_id, | ||
| role="user", | ||
| content=user_msg_content if msg_type != "image" else f"[file:{filename}]", | ||
| conversation_id=session_conv_id, | ||
| external_message_id=message_id or None, | ||
| ) | ||
| ) | ||
| _sess.last_message_at = _dt.now(_tz.utc) | ||
|
|
||
| # Load conversation history for LLM context | ||
|
|
@@ -1399,24 +1450,34 @@ async def _img_heartbeat(): | |
| await asyncio.sleep(random.uniform(1.0, 2.0)) | ||
|
|
||
| ack = random.choice(_FILE_ACK_MESSAGES) | ||
| ack_message_id = None | ||
| try: | ||
| if chat_type == "group" and chat_id: | ||
| await feishu_service.send_message( | ||
| _ack_resp = await feishu_service.send_message( | ||
| config.app_id, config.app_secret, chat_id, "text", | ||
| json.dumps({"text": ack}), receive_id_type="chat_id", | ||
| ) | ||
| else: | ||
| await feishu_service.send_message( | ||
| _ack_resp = await feishu_service.send_message( | ||
| config.app_id, config.app_secret, sender_open_id, "text", | ||
| json.dumps({"text": ack}), | ||
| ) | ||
| ack_message_id = _extract_feishu_message_id(_ack_resp) | ||
| except Exception as e: | ||
| logger.error(f"[Feishu] Failed to send ack: {e}") | ||
|
|
||
| # Store ack in DB | ||
| async with _async_session() as db2: | ||
| db2.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="assistant", | ||
| content=ack, conversation_id=session_conv_id)) | ||
| db2.add( | ||
| ChatMessage( | ||
| agent_id=agent_id, | ||
| user_id=platform_user_id, | ||
| role="assistant", | ||
| content=ack, | ||
| conversation_id=session_conv_id, | ||
| external_message_id=ack_message_id, | ||
| ) | ||
| ) | ||
| await db2.commit() | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,4 @@ | ||
| """Shared helper: find-or-create ChatSession by external channel conv_id. | ||
|
|
||
| Used by feishu.py, slack.py, discord_bot.py, wecom.py, teams.py — eliminates in-process caches. | ||
| """ | ||
| """Shared helpers for external channel sessions and shared public transcript reads.""" | ||
| import uuid as _uuid | ||
| from datetime import datetime, timezone | ||
|
|
||
|
|
@@ -65,3 +62,133 @@ async def find_or_create_channel_session( | |
| session.title = group_name[:40] | ||
|
|
||
| return session | ||
|
|
||
|
|
||
| def _normalize_shared_channel_history( | ||
| messages: list, | ||
| current_agent_id: _uuid.UUID, | ||
| user_names: dict[_uuid.UUID, str], | ||
| agent_names: dict[_uuid.UUID, str], | ||
| limit: int, | ||
| ) -> list[dict]: | ||
| """Normalize raw group messages into prompt-safe shared transcript entries. | ||
|
|
||
| - Human messages remain `user` messages with explicit speaker labels. | ||
| - The current agent's prior replies remain `assistant` messages. | ||
| - Other agents' prior replies are converted into visible public transcript | ||
| lines as `user` messages so they are not mistaken for the current | ||
| assistant's own prior output. | ||
| - Duplicate public human messages from multiple per-agent sessions are | ||
| deduplicated within a short time bucket. | ||
| """ | ||
| seen: set[tuple] = set() | ||
| external_message_positions: dict[str, int] = {} | ||
| normalized: list[dict] = [] | ||
|
|
||
| for message in messages: | ||
| if message.role not in ("user", "assistant"): | ||
| continue | ||
|
|
||
| if message.role == "assistant": | ||
| speaker_name = agent_names.get(message.agent_id, "未知智能体") | ||
| if message.agent_id == current_agent_id: | ||
| entry = {"role": "assistant", "content": message.content} | ||
| else: | ||
| entry = {"role": "user", "content": f"[其他智能体 {speaker_name}] {message.content}"} | ||
| dedupe_speaker = ("assistant", message.agent_id) | ||
| entry_priority = 2 | ||
| else: | ||
| speaker_name = user_names.get(message.user_id, "群成员") | ||
| entry = {"role": "user", "content": f"[群成员 {speaker_name}] {message.content}"} | ||
| dedupe_speaker = ("user", message.user_id) | ||
| entry_priority = 1 | ||
|
|
||
| external_message_id = getattr(message, "external_message_id", None) | ||
| created_at = getattr(message, "created_at", None) | ||
| bucket = int(created_at.timestamp() // 10) if created_at else None | ||
| if external_message_id: | ||
| existing_pos = external_message_positions.get(external_message_id) | ||
| if existing_pos is None: | ||
| external_message_positions[external_message_id] = len(normalized) | ||
| normalized.append({**entry, "_priority": entry_priority}) | ||
| else: | ||
| if entry_priority > normalized[existing_pos].get("_priority", 0): | ||
| normalized[existing_pos] = {**entry, "_priority": entry_priority} | ||
| continue | ||
| else: | ||
| dedupe_key = (dedupe_speaker, entry["content"], bucket) | ||
| if dedupe_key in seen: | ||
| continue | ||
| seen.add(dedupe_key) | ||
| normalized.append({**entry, "_priority": entry_priority}) | ||
|
Comment on lines
+119
to
+123
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The fallback bucket dedupe is applied unconditionally to messages without Useful? React with 👍 / 👎. |
||
|
|
||
| if len(normalized) > limit: | ||
| normalized = normalized[-limit:] | ||
| return [{"role": entry["role"], "content": entry["content"]} for entry in normalized] | ||
|
|
||
|
|
||
| async def load_shared_channel_history( | ||
| db: AsyncSession, | ||
| *, | ||
| current_agent_id: _uuid.UUID, | ||
| current_tenant_id: _uuid.UUID | None, | ||
| external_conv_id: str, | ||
| source_channel: str, | ||
| limit: int = 100, | ||
| ) -> list[dict]: | ||
| """Load shared public history across all sessions of the same external chat. | ||
|
|
||
| This is intentionally scoped to public channel context. Persistence remains | ||
| per-agent, but prompt construction for shared channels can read a unified | ||
| transcript keyed by `(source_channel, external_conv_id)`. | ||
| """ | ||
| from app.models.agent import Agent | ||
| from app.models.audit import ChatMessage | ||
| from app.models.user import User | ||
|
|
||
| if not current_tenant_id: | ||
| return [] | ||
|
|
||
| sessions_result = await db.execute( | ||
| select(ChatSession.id) | ||
| .join(Agent, Agent.id == ChatSession.agent_id) | ||
| .where( | ||
| ChatSession.external_conv_id == external_conv_id, | ||
| ChatSession.source_channel == source_channel, | ||
| ChatSession.is_group == True, | ||
| Agent.tenant_id == current_tenant_id, | ||
| ) | ||
| ) | ||
| session_ids = [str(row[0]) for row in sessions_result.fetchall()] | ||
| if not session_ids: | ||
| return [] | ||
|
|
||
| raw_limit = max(limit * 4, limit) | ||
| messages_result = await db.execute( | ||
| select(ChatMessage) | ||
| .where(ChatMessage.conversation_id.in_(session_ids)) | ||
| .order_by(ChatMessage.created_at.desc()) | ||
| .limit(raw_limit) | ||
| ) | ||
| messages = list(reversed(messages_result.scalars().all())) | ||
| if not messages: | ||
| return [] | ||
|
|
||
| user_ids = {m.user_id for m in messages if getattr(m, "user_id", None)} | ||
| agent_ids = {m.agent_id for m in messages if getattr(m, "agent_id", None)} | ||
|
|
||
| user_names: dict[_uuid.UUID, str] = {} | ||
| if user_ids: | ||
| user_result = await db.execute( | ||
| select(User.id, User.display_name, User.username).where(User.id.in_(user_ids)) | ||
| ) | ||
| for user_id, display_name, username in user_result.fetchall(): | ||
| user_names[user_id] = display_name or username or "群成员" | ||
|
|
||
| agent_names: dict[_uuid.UUID, str] = {} | ||
| if agent_ids: | ||
| agent_result = await db.execute(select(Agent.id, Agent.name).where(Agent.id.in_(agent_ids))) | ||
| for agent_id, name in agent_result.fetchall(): | ||
| agent_names[agent_id] = name or "未知智能体" | ||
|
|
||
| return _normalize_shared_channel_history(messages, current_agent_id, user_names, agent_names, limit) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting
down_revisionto20260313_column_modifycreates a second migration head (add_chat_msg_ext_id) instead of extending the existing head chain (currently ending atf1a2b3c4d5e6), soalembic upgrade headbecomes ambiguous. This repository’s startup path callsalembic upgrade headinbackend/entrypoint.sh, so upgrades can fail before this column is applied while new Feishu writes already referenceexternal_message_id.Useful? React with 👍 / 👎.