Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions backend/alembic/versions/add_chat_message_external_message_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Add external_message_id to chat_messages for channel-native dedupe.

Revision ID: add_chat_msg_ext_id
Revises: add_microsoft_teams_support
"""

from alembic import op


revision = "add_chat_msg_ext_id"
down_revision = "20260313_column_modify"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Rebase migration onto the current Alembic head

Setting down_revision to 20260313_column_modify creates a second migration head (add_chat_msg_ext_id) instead of extending the existing head chain (currently ending at f1a2b3c4d5e6), so alembic upgrade head becomes ambiguous. This repository’s startup path calls alembic upgrade head in backend/entrypoint.sh, so upgrades can fail before this column is applied while new Feishu writes already reference external_message_id.

Useful? React with 👍 / 👎.

branch_labels = None
depends_on = None


def upgrade() -> None:
op.execute(
"ALTER TABLE chat_messages ADD COLUMN IF NOT EXISTS external_message_id VARCHAR(255)"
)
op.execute(
"CREATE INDEX IF NOT EXISTS ix_chat_messages_external_message_id ON chat_messages(external_message_id)"
)


def downgrade() -> None:
op.execute("DROP INDEX IF EXISTS ix_chat_messages_external_message_id")
op.execute("ALTER TABLE chat_messages DROP COLUMN IF EXISTS external_message_id")
113 changes: 87 additions & 26 deletions backend/app/api/feishu.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ async def drain(self) -> None:
await self._tail


def _extract_feishu_message_id(response: dict | None) -> str | None:
"""Extract message_id from Feishu send-message responses when available."""
if not isinstance(response, dict):
return None
data = response.get("data")
if isinstance(data, dict):
message_id = data.get("message_id")
if message_id:
return str(message_id)
return None


# ─── OAuth ──────────────────────────────────────────────

from fastapi.responses import HTMLResponse, Response
Expand Down Expand Up @@ -330,6 +342,7 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict, db: AsyncSession

if event_type == "im.message.receive_v1":
message = event.get("message", {})
message_id = message.get("message_id", "")
sender = event.get("sender", {}).get("sender_id", {})
sender_open_id = sender.get("open_id", "")
sender_user_id_from_event = sender.get("user_id", "") # tenant-stable ID, available directly in event body
Expand Down Expand Up @@ -439,30 +452,41 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict, db: AsyncSession
# Load recent conversation history via session (session UUID may already exist)
from app.models.audit import ChatMessage
from app.models.agent import Agent as AgentModel
from app.services.channel_session import find_or_create_channel_session
from app.models.chat_session import ChatSession as _ChatSession
from app.services.channel_session import find_or_create_channel_session, load_shared_channel_history
agent_r = await db.execute(select(AgentModel).where(AgentModel.id == agent_id))
agent_obj = agent_r.scalar_one_or_none()
creator_id = agent_obj.creator_id if agent_obj else agent_id
from app.models.agent import DEFAULT_CONTEXT_WINDOW_SIZE
ctx_size = (agent_obj.context_window_size or DEFAULT_CONTEXT_WINDOW_SIZE) if agent_obj else DEFAULT_CONTEXT_WINDOW_SIZE

# Pre-resolve session so history lookup uses the UUID (session created later if new)
_pre_sess_r = await db.execute(
select(__import__('app.models.chat_session', fromlist=['ChatSession']).ChatSession).where(
__import__('app.models.chat_session', fromlist=['ChatSession']).ChatSession.agent_id == agent_id,
__import__('app.models.chat_session', fromlist=['ChatSession']).ChatSession.external_conv_id == conv_id,
if chat_type == "group" and chat_id:
history = await load_shared_channel_history(
db,
current_agent_id=agent_id,
current_tenant_id=agent_obj.tenant_id if agent_obj else None,
external_conv_id=conv_id,
source_channel="feishu",
limit=ctx_size,
)
Comment on lines +464 to 471
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Exclude current event from shared group history

This shared-history fetch runs before the current inbound message is persisted for this agent, so in multi-agent groups a slower agent can read the same Feishu event that another agent already wrote and then send that same text again as llm_user_text. In that timing window the model receives the current turn twice (once from history, once as the live user input), which can cause duplicated or over-weighted responses. Filter shared history by the current message_id (or append history after removing the in-flight event) before invoking the LLM.

Useful? React with 👍 / 👎.

)
_pre_sess = _pre_sess_r.scalar_one_or_none()
_history_conv_id = str(_pre_sess.id) if _pre_sess else conv_id
history_result = await db.execute(
select(ChatMessage)
.where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == _history_conv_id)
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
)
history_msgs = history_result.scalars().all()
history = [{"role": m.role, "content": m.content} for m in reversed(history_msgs)]
else:
# Keep P2P behavior unchanged: use the current agent's own session history.
_pre_sess_r = await db.execute(
select(_ChatSession).where(
_ChatSession.agent_id == agent_id,
_ChatSession.external_conv_id == conv_id,
)
)
_pre_sess = _pre_sess_r.scalar_one_or_none()
_history_conv_id = str(_pre_sess.id) if _pre_sess else conv_id
history_result = await db.execute(
select(ChatMessage)
.where(ChatMessage.agent_id == agent_id, ChatMessage.conversation_id == _history_conv_id)
.order_by(ChatMessage.created_at.desc())
.limit(ctx_size)
)
history_msgs = history_result.scalars().all()
history = [{"role": m.role, "content": m.content} for m in reversed(history_msgs)]

# --- Resolve Feishu sender identity & find/create platform user ---
import uuid as _uuid
Expand Down Expand Up @@ -577,7 +601,16 @@ async def process_feishu_event(agent_id: uuid.UUID, body: dict, db: AsyncSession
session_conv_id = str(_sess.id)

# Save user message
db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user", content=user_text, conversation_id=session_conv_id))
db.add(
ChatMessage(
agent_id=agent_id,
user_id=platform_user_id,
role="user",
content=user_text,
conversation_id=session_conv_id,
external_message_id=message_id or None,
)
)
_sess.last_message_at = _dt.now(_tz.utc)
await db.commit()

Expand Down Expand Up @@ -1066,7 +1099,18 @@ async def _heartbeat():
logger.error(f"[Feishu] Failed to create task: {e}")

# Save assistant reply to history (use platform_user_id so messages stay in one session)
db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="assistant", content=reply_text, conversation_id=session_conv_id))
db.add(
ChatMessage(
agent_id=agent_id,
user_id=platform_user_id,
role="assistant",
content=reply_text,
conversation_id=session_conv_id,
# CardKit flow does not currently expose a stable message_id here.
# IM patch fallback does, so persist it when available.
external_message_id=msg_id_for_patch or None,
)
)
_sess.last_message_at = _dt.now(_tz.utc)
await db.commit()

Expand Down Expand Up @@ -1233,9 +1277,16 @@ async def _handle_feishu_file(db, agent_id, config, message, sender_open_id, cha
user_msg_content = f"[用户发送了图片]\n{_image_marker}"
else:
user_msg_content = f"[file:{filename}]"
db.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="user",
content=user_msg_content if msg_type != "image" else f"[file:{filename}]",
conversation_id=session_conv_id))
db.add(
ChatMessage(
agent_id=agent_id,
user_id=platform_user_id,
role="user",
content=user_msg_content if msg_type != "image" else f"[file:{filename}]",
conversation_id=session_conv_id,
external_message_id=message_id or None,
)
)
_sess.last_message_at = _dt.now(_tz.utc)

# Load conversation history for LLM context
Expand Down Expand Up @@ -1399,24 +1450,34 @@ async def _img_heartbeat():
await asyncio.sleep(random.uniform(1.0, 2.0))

ack = random.choice(_FILE_ACK_MESSAGES)
ack_message_id = None
try:
if chat_type == "group" and chat_id:
await feishu_service.send_message(
_ack_resp = await feishu_service.send_message(
config.app_id, config.app_secret, chat_id, "text",
json.dumps({"text": ack}), receive_id_type="chat_id",
)
else:
await feishu_service.send_message(
_ack_resp = await feishu_service.send_message(
config.app_id, config.app_secret, sender_open_id, "text",
json.dumps({"text": ack}),
)
ack_message_id = _extract_feishu_message_id(_ack_resp)
except Exception as e:
logger.error(f"[Feishu] Failed to send ack: {e}")

# Store ack in DB
async with _async_session() as db2:
db2.add(ChatMessage(agent_id=agent_id, user_id=platform_user_id, role="assistant",
content=ack, conversation_id=session_conv_id))
db2.add(
ChatMessage(
agent_id=agent_id,
user_id=platform_user_id,
role="assistant",
content=ack,
conversation_id=session_conv_id,
external_message_id=ack_message_id,
)
)
await db2.commit()


Expand Down
1 change: 1 addition & 0 deletions backend/app/models/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class ChatMessage(Base):
)
content: Mapped[str] = mapped_column(Text, nullable=False)
conversation_id: Mapped[str] = mapped_column(String(200), default="web", nullable=False)
external_message_id: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True)
# Participant identity (unified User/Agent identity)
participant_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), ForeignKey("participants.id"), nullable=True)
# Model thinking process
Expand Down
135 changes: 131 additions & 4 deletions backend/app/services/channel_session.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
"""Shared helper: find-or-create ChatSession by external channel conv_id.

Used by feishu.py, slack.py, discord_bot.py, wecom.py, teams.py — eliminates in-process caches.
"""
"""Shared helpers for external channel sessions and shared public transcript reads."""
import uuid as _uuid
from datetime import datetime, timezone

Expand Down Expand Up @@ -65,3 +62,133 @@ async def find_or_create_channel_session(
session.title = group_name[:40]

return session


def _normalize_shared_channel_history(
messages: list,
current_agent_id: _uuid.UUID,
user_names: dict[_uuid.UUID, str],
agent_names: dict[_uuid.UUID, str],
limit: int,
) -> list[dict]:
"""Normalize raw group messages into prompt-safe shared transcript entries.

- Human messages remain `user` messages with explicit speaker labels.
- The current agent's prior replies remain `assistant` messages.
- Other agents' prior replies are converted into visible public transcript
lines as `user` messages so they are not mistaken for the current
assistant's own prior output.
- Duplicate public human messages from multiple per-agent sessions are
deduplicated within a short time bucket.
"""
seen: set[tuple] = set()
external_message_positions: dict[str, int] = {}
normalized: list[dict] = []

for message in messages:
if message.role not in ("user", "assistant"):
continue

if message.role == "assistant":
speaker_name = agent_names.get(message.agent_id, "未知智能体")
if message.agent_id == current_agent_id:
entry = {"role": "assistant", "content": message.content}
else:
entry = {"role": "user", "content": f"[其他智能体 {speaker_name}] {message.content}"}
dedupe_speaker = ("assistant", message.agent_id)
entry_priority = 2
else:
speaker_name = user_names.get(message.user_id, "群成员")
entry = {"role": "user", "content": f"[群成员 {speaker_name}] {message.content}"}
dedupe_speaker = ("user", message.user_id)
entry_priority = 1

external_message_id = getattr(message, "external_message_id", None)
created_at = getattr(message, "created_at", None)
bucket = int(created_at.timestamp() // 10) if created_at else None
if external_message_id:
existing_pos = external_message_positions.get(external_message_id)
if existing_pos is None:
external_message_positions[external_message_id] = len(normalized)
normalized.append({**entry, "_priority": entry_priority})
else:
if entry_priority > normalized[existing_pos].get("_priority", 0):
normalized[existing_pos] = {**entry, "_priority": entry_priority}
continue
else:
dedupe_key = (dedupe_speaker, entry["content"], bucket)
if dedupe_key in seen:
continue
seen.add(dedupe_key)
normalized.append({**entry, "_priority": entry_priority})
Comment on lines +119 to +123
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Restrict bucket dedupe to human transcript entries

The fallback bucket dedupe is applied unconditionally to messages without external_message_id, including assistant rows, even though the function comment says this bucketing is for duplicate human messages. Since many assistant outputs are saved with no external ID, two legitimate assistant turns from the same agent with identical text in the same 10-second bucket are collapsed into one, dropping real conversation turns from shared context.

Useful? React with 👍 / 👎.


if len(normalized) > limit:
normalized = normalized[-limit:]
return [{"role": entry["role"], "content": entry["content"]} for entry in normalized]


async def load_shared_channel_history(
db: AsyncSession,
*,
current_agent_id: _uuid.UUID,
current_tenant_id: _uuid.UUID | None,
external_conv_id: str,
source_channel: str,
limit: int = 100,
) -> list[dict]:
"""Load shared public history across all sessions of the same external chat.

This is intentionally scoped to public channel context. Persistence remains
per-agent, but prompt construction for shared channels can read a unified
transcript keyed by `(source_channel, external_conv_id)`.
"""
from app.models.agent import Agent
from app.models.audit import ChatMessage
from app.models.user import User

if not current_tenant_id:
return []

sessions_result = await db.execute(
select(ChatSession.id)
.join(Agent, Agent.id == ChatSession.agent_id)
.where(
ChatSession.external_conv_id == external_conv_id,
ChatSession.source_channel == source_channel,
ChatSession.is_group == True,
Agent.tenant_id == current_tenant_id,
)
)
session_ids = [str(row[0]) for row in sessions_result.fetchall()]
if not session_ids:
return []

raw_limit = max(limit * 4, limit)
messages_result = await db.execute(
select(ChatMessage)
.where(ChatMessage.conversation_id.in_(session_ids))
.order_by(ChatMessage.created_at.desc())
.limit(raw_limit)
)
messages = list(reversed(messages_result.scalars().all()))
if not messages:
return []

user_ids = {m.user_id for m in messages if getattr(m, "user_id", None)}
agent_ids = {m.agent_id for m in messages if getattr(m, "agent_id", None)}

user_names: dict[_uuid.UUID, str] = {}
if user_ids:
user_result = await db.execute(
select(User.id, User.display_name, User.username).where(User.id.in_(user_ids))
)
for user_id, display_name, username in user_result.fetchall():
user_names[user_id] = display_name or username or "群成员"

agent_names: dict[_uuid.UUID, str] = {}
if agent_ids:
agent_result = await db.execute(select(Agent.id, Agent.name).where(Agent.id.in_(agent_ids)))
for agent_id, name in agent_result.fetchall():
agent_names[agent_id] = name or "未知智能体"

return _normalize_shared_channel_history(messages, current_agent_id, user_names, agent_names, limit)
Loading