From 0dd578daf1cc142039365abf4298c118e546cd77 Mon Sep 17 00:00:00 2001 From: cyning Date: Tue, 9 Jun 2026 17:40:11 +0800 Subject: [PATCH] =?UTF-8?q?refactor(api):=20W8=20Intent=20=E6=A0=88?= =?UTF-8?q?=E6=8B=86=E5=88=86=EF=BC=88rules=20+=20LLM=20=E5=AD=90=E6=A8=A1?= =?UTF-8?q?=E5=9D=97=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Epic api-modularization W8:表驱动规则与 LLM/cache/retry 分文件; intent_agent 保留模型与公开 re-export,测试 monkeypatch 路径不变。 Co-authored-by: Cursor --- api/intent_agent.py | 683 +----------------- api/intent_llm.py | 656 +++++++++++++++++ api/intent_router.py | 192 +---- api/intent_router_rules.py | 166 +++++ .../active/task_api_intent_stack_split_w8.md | 81 +++ 5 files changed, 942 insertions(+), 836 deletions(-) create mode 100644 api/intent_llm.py create mode 100644 api/intent_router_rules.py create mode 100644 docs/tasks/active/task_api_intent_stack_split_w8.md diff --git a/api/intent_agent.py b/api/intent_agent.py index e4a30274..dbf57cc4 100644 --- a/api/intent_agent.py +++ b/api/intent_agent.py @@ -1,174 +1,22 @@ +"""Intent V2 公开模型与 hints 仲裁;LLM 路径见 `intent_llm`。""" + from __future__ import annotations -import asyncio -import hashlib -import json -import logging -import os -import re -import time -from collections import OrderedDict from dataclasses import dataclass, replace from typing import Any, Literal -from openai import ( - APIConnectionError, - APIStatusError, - APITimeoutError, - InternalServerError, - OpenAI, - RateLimitError, -) - from .intent_hints import ( arbitration_enabled, - build_intent_hints_prompt_block, hints_arbitration_should_apply, load_resolved_hints, ) from .intent_router import decide_intent as decide_intent_v1 from .rag_env import openai_siliconflow_client -from .text2sql_core import is_text2sql_intent -from .tools import Tool, tool_mode_map ToolName = Literal["rag_search", "text2sql_query", "direct_answer"] V1Mode = Literal["rag", "text2sql", "no_data"] -_logger = logging.getLogger(__name__) - -_VOLATILE_RAW_KEYS = frozenset({"cache", "cache_key_hash", "latency_ms", "llm_prompts"}) - - -class LRUCache: - """IntentDecision 缓存:TTL 到期失效 + 超容量 LRU 淘汰。""" - - def __init__(self, *, maxsize: int, ttl_s: float) -> None: - self._maxsize = maxsize - self._ttl_s = ttl_s - self._items: OrderedDict[str, tuple[float, Any]] = OrderedDict() - - def clear(self) -> None: - self._items.clear() - - def get(self, key: str) -> Any | None: - now = time.time() - it = self._items.get(key) - if not it: - return None - expires_at, val = it - if expires_at <= now: - try: - del self._items[key] - except KeyError: - pass - return None - # refresh LRU - self._items.move_to_end(key) - return val - - def set(self, key: str, val: Any) -> None: - expires_at = time.time() + self._ttl_s - self._items[key] = (expires_at, val) - self._items.move_to_end(key) - while len(self._items) > self._maxsize: - self._items.popitem(last=False) - - -# 全局 Intent 缓存:maxsize/TTL 与任务单 P1-C 对齐 -_intent_cache: LRUCache = LRUCache(maxsize=1000, ttl_s=300.0) - - -def clear_intent_cache() -> None: - """供基准脚本 / 测试做「冷启动」轮次清空缓存。""" - _intent_cache.clear() - - -def _debug_intent_cache_enabled() -> bool: - raw = (os.getenv("DEBUG_INTENT_CACHE", "") or "").strip().lower() - return raw in ("1", "true", "yes", "on") - - -def _intent_history_tail_for_hash(history: list[dict[str, Any]]) -> list[dict[str, str]]: - """最近 3 轮对话:最多 6 条 user/assistant 消息,仅 role + content。""" - tail = (history or [])[-6:] - out: list[dict[str, str]] = [] - for m in tail: - role = str(m.get("role", "") or "") - content = str(m.get("content", "") or "") - out.append({"role": role, "content": content}) - return out - - -def compute_history_hash(history: list[dict[str, Any]] | None) -> str: - """稳定 history 指纹:JSON sort_keys + sha256 前 16 hex。""" - normalized = _intent_history_tail_for_hash(history or []) - payload = json.dumps(normalized, sort_keys=True, ensure_ascii=False) - return hashlib.sha256(payload.encode("utf-8")).hexdigest()[:16] - - -def _intent_composite_cache_key(*, query: str, history: list[dict[str, Any]] | None) -> str: - """缓存主键:history_hash + query,不同 history 不会共用条目。""" - hh = compute_history_hash(history) - return f"{hh}\x1f{(query or '').strip()}" - - -def _cache_key_obs_hash(composite_key: str) -> str: - """可观测短哈希:不暴露 query 明文。""" - return hashlib.sha256(composite_key.encode("utf-8")).hexdigest()[:16] - - -def _raw_response_for_cache_store(raw: dict[str, Any]) -> dict[str, Any]: - """写入缓存前去掉本轮可观测字段与大段 prompt,避免污染命中副本。""" - return {k: v for k, v in raw.items() if k not in _VOLATILE_RAW_KEYS} - - -def _log_intent_cache_line(*, event: str, key_hash: str, latency_ms: int) -> None: - if not _debug_intent_cache_enabled(): - return - _logger.info("[intent-cache] %s key_hash=%s latency_ms=%s", event, key_hash, latency_ms) - - -def _exc_short_label(exc: BaseException) -> str: - return type(exc).__name__ - - -def _log_intent_retry_exhausted(*, attempt: int, max_retries: int, exc: BaseException) -> None: - _logger.warning( - "[intent-retry] exhausted attempt=%s max=%s err=%s → v1_fallback", - attempt, - max_retries, - _exc_short_label(exc), - ) - - -def _log_intent_retry_will_retry( - *, - attempt: int, - max_retries: int, - exc: BaseException, - timeout_s: float, - backoff_s: float, -) -> None: - _logger.warning( - "[intent-retry] attempt=%s/%s failed err=%s timeout_s=%s backoff_s=%s will_retry", - attempt, - max_retries, - _exc_short_label(exc), - timeout_s, - backoff_s, - ) - - -def _log_intent_retry_success(*, attempt: int, max_retries: int, timeout_s: float) -> None: - _logger.info( - "[intent-retry] success attempt=%s/%s timeout_s=%s", - attempt, - max_retries, - timeout_s, - ) - - @dataclass(frozen=True) class StructuredSignals: """用于 gating 的结构化信号(关键约束:RAG_RETRIEVE_EMPTY 的 SQL fallback 必须依赖这些信号)。""" @@ -216,56 +64,6 @@ def build_intent_path_obs(raw: dict[str, Any] | None) -> dict[str, Any]: } -def _intent_decision_for_cache_store(d: IntentDecision) -> IntentDecision: - return replace(d, raw_response=_raw_response_for_cache_store(dict(d.raw_response))) - - -def _attach_cache_observability( - d: IntentDecision, - *, - cache: Literal["hit", "miss"], - cache_key_hash: str, - latency_ms: int, -) -> IntentDecision: - merged = {**dict(d.raw_response), "cache": cache, "cache_key_hash": cache_key_hash, "latency_ms": latency_ms} - return replace(d, raw_response=merged) - - -def _clamp01(x: float) -> float: - if x < 0.0: - return 0.0 - if x > 1.0: - return 1.0 - return x - - -def _has_aggregation_keywords(query: str) -> bool: - q = (query or "").lower() - needles = ( - "多少", - "金额", - "收入", - "支出", - "人数", - "数量", - "总数", - "平均", - "最大", - "最小", - "top", - "排行", - "排名", - "趋势", - "对比", - "分组", - "group by", - "count", - "sum", - "avg", - ) - return any(n in q for n in needles) - - def apply_hints_arbitration( decision: IntentDecision, *, @@ -297,455 +95,28 @@ def apply_hints_arbitration( ) -def _fallback_tool_by_low_confidence(tool: ToolName) -> ToolName: - # 按总纲的“Intent 低置信度 fallback”映射 - if tool == "text2sql_query": - return "rag_search" - if tool == "rag_search": - return "direct_answer" - return "rag_search" - - -def _map_v1_mode_to_tool(mode: str) -> ToolName: - m = (mode or "").strip().lower() - if m == "text2sql": - return "text2sql_query" - if m == "rag": - return "rag_search" - return "direct_answer" - - -def _extract_json_obj(text: str) -> dict[str, Any] | None: - # 仅做稳健提取:找到第一个 {...} 区块解析 - s = text or "" - m = re.search(r"\{[\s\S]*\}", s) - if not m: - return None - raw = m.group(0) - try: - obj = json.loads(raw) - except Exception: # noqa: BLE001 - return None - if isinstance(obj, dict): - return obj - return None - - -async def _llm_decide_v2( - *, - oai: OpenAI, - query: str, - history: list[dict[str, Any]], - tools: list[Tool], - timeout_s: float, - capture_prompts: bool = False, -) -> tuple[dict[str, Any], list[dict[str, Any]] | None]: - # 让 LLM 按 spec 输出 tool/reasoning/confidence(并尽量使用结构化 JSON) - tools_desc = "\n\n".join([f"- {t.name}: {t.description}" for t in tools]) - # 与 compute_history_hash 的「最近 6 条」对齐,避免多轮指代时上下文过短 - history_block = "\n".join( - [f"{m.get('role', '?')}: {str(m.get('content', ''))[:200]}" for m in (history or [])[-6:]] - ).strip() or "无历史对话" - - hints_block = build_intent_hints_prompt_block(load_resolved_hints()).strip() - hints_section = f"\n\n{hints_block}\n" if hints_block else "" - - prompt = f"""你是 ChatBI V2 的意图识别器:在下列工具中选**恰好一个**,用于本仓库/本产品的对话路由(评测集亦按此口径)。 - -## 可用工具(描述以注册表为准) -{tools_desc} - -## 总原则 - -- **text2sql_query**:用户要**本库业务数据**的具体数值/排名/趋势/分组统计,且应由数据库查询给出答案。 -- **rag_search**:需要**项目内文档、规范、任务单、架构说明、评测口径、错误码约定、实现细节**等;或问题明显落在「本仓库怎么说/怎么做」而非百科通识一句带过。 -- **direct_answer**:**不依赖**内部文档即可完成——翻译、润色、创作、头脑风暴、纯算法题/语法教学、与当前产品/仓库无关的通识科普等。 -{hints_section} -## 「通用知识」vs「须查资料」(易错点) - -下列主题若**未**明确说「只要高中数学定义、不要项目文档」,在本产品中默认走 **rag_search**(便于对齐内部文档与评测口径): - -- 指标与评测:**macro-F1**、**confusion matrix**、准确率/Precision/Recall、分桶统计等; -- 工程约束:**CI**、**stub**、零外呼门禁、**P50/P95** 基准写法; -- 仓库与规范:**_tech_graph**、**intent_router**、**Supabase** 在本项目中的错误处理约定等。 - -若用户只要**与项目无关**的通识(例:「用通俗语言解释量子计算」),选 **direct_answer**。 - -## 多轮对话 - -- 必须结合 **历史对话** 做指代消解:「它/那/这个」继承上文主题。 -- 若上文在讨论 **Text2SQL / 销售额 / 查库**,本轮问「**要不要查数据库**」「**是否要走 SQL**」「路由边界」等——属于**产品能力/路由说明**,选 **rag_search**(查文档说明),**不要**因字面像常识而选 direct_answer。 -- 若上文是写作/翻译/生成示例代码,本轮续写、改写语气、再要例子——多为 **direct_answer**。 - -## 与 text2sql 的边界 - -| 用户问题 | 选择 | -|---------|------| -| 「昨天销售额是多少」 | **text2sql_query**(要真实数据) | -| 「怎么统计 heros 表」且明显教写法、不要执行 | **direct_answer** | -| 「heros 表有哪些字段」 | **rag_search** | - -## Few-shot(短) - -Q: 昨天销售额是多少? -{{"tool": "text2sql_query", "reasoning": "需要查库得到金额", "confidence": 0.95}} - -Q: 如何计算 confusion matrix -{{"tool": "rag_search", "reasoning": "评测/文档口径,宜检索项目内说明", "confidence": 0.9}} - -Q: 解释一下量子计算,用通俗语言 -{{"tool": "direct_answer", "reasoning": "与仓库无关的通识解释", "confidence": 0.88}} - -[历史] -user: 昨天销售额是多少 -assistant: 我可以通过 Text2SQL 去查询…… -user: 那需要查数据库吗 -{{"tool": "rag_search", "reasoning": "结合上文仍在谈查数路由,应查文档说明是否走库", "confidence": 0.86}} - -## 历史对话 -{history_block} - -## 当前用户问题 -{query} - -## 输出 -仅输出一个 JSON 对象,勿其它文字: -{{ - "tool": "rag_search | text2sql_query | direct_answer", - "reasoning": "用户可见的 1-2 句摘要", - "confidence": 0.0-1.0 -}} -""" - - intent_model = os.getenv("INTENT_LLM_MODEL", "deepseek-ai/DeepSeek-V4-Pro") - intent_messages: list[dict[str, str]] = [ - { - "role": "system", - "content": ( - "你是严谨的意图分类器。仅从工具名集合中选择;" - "只输出一个 JSON 对象,不要 Markdown、不要前后缀说明。" - ), - }, - {"role": "user", "content": prompt}, - ] - - def _sync_call() -> str: - res = oai.chat.completions.create( - model=intent_model, - messages=intent_messages, - temperature=0.0, - stream=False, - ) - return (res.choices[0].message.content or "").strip() - - text = await asyncio.wait_for(asyncio.to_thread(_sync_call), timeout=timeout_s) - obj = _extract_json_obj(text) - if obj is None: - raise ValueError("LLM intent 输出不是合法 JSON") - prompts: list[dict[str, Any]] | None = None - if capture_prompts: - prompts = [{"phase": "intent", "model": intent_model, "messages": intent_messages}] - return obj, prompts - - -def _effective_intent_llm_timeout_s(override: float) -> float: - """Intent LLM 单次 `wait_for` 上限:优先读 `CHATBI_V2_INTENT_TIMEOUT_S`,否则用调用方传入值。""" - raw = (os.getenv("CHATBI_V2_INTENT_TIMEOUT_S") or "").strip() - if raw: - try: - return max(0.5, min(120.0, float(raw))) - except ValueError: - pass - return max(0.5, min(120.0, float(override))) - - -def _intent_llm_max_retries() -> int: - """Intent LLM 外呼最大尝试次数(含首次);默认 3。""" - raw = (os.getenv("CHATBI_V2_INTENT_LLM_RETRIES") or "").strip() - if raw: - try: - return max(1, min(5, int(raw))) - except ValueError: - pass - return 3 - - -def _intent_llm_retry_backoff_s(attempt: int) -> float: - """第 attempt 次失败后的退避秒数(attempt 从 1 起)。""" - base = 0.15 - raw = (os.getenv("CHATBI_V2_INTENT_LLM_RETRY_BACKOFF_S") or "").strip() - if raw: - try: - base = max(0.0, min(5.0, float(raw))) - except ValueError: - pass - return base * (2 ** max(0, attempt - 1)) - - -def _intent_llm_retry_timeout_factors() -> tuple[float, ...]: - """各轮单次 wait_for 相对首轮的系数;末项用于超出长度的后续轮次。""" - raw = (os.getenv("CHATBI_V2_INTENT_RETRY_TIMEOUT_FACTORS") or "").strip() - if raw: - try: - parsed = tuple(max(0.1, min(1.0, float(x.strip()))) for x in raw.split(",") if x.strip()) - if parsed: - return parsed - except ValueError: - pass - return (1.0, 0.65, 0.4) - - -def _intent_llm_timeout_s_for_attempt(base_timeout_s: float, attempt: int) -> float: - """重试轮次逐步缩短单次 wait_for:首轮全量,后续轮按系数递减。""" - factors = _intent_llm_retry_timeout_factors() - idx = min(max(attempt, 1) - 1, len(factors) - 1) - scaled = base_timeout_s * factors[idx] - return max(0.5, min(120.0, scaled)) - - -def _intent_llm_retryable(exc: BaseException) -> bool: - """仅瞬态/超时类错误可重试;JSON 解析与 tool 校验失败不重试。""" - if isinstance(exc, (asyncio.TimeoutError, APITimeoutError)): - return True - if isinstance(exc, (APIConnectionError, InternalServerError, RateLimitError)): - return True - if isinstance(exc, APIStatusError): - code = getattr(exc, "status_code", None) - return code in (429, 502, 503, 504) - return False - - -async def _llm_decide_v2_with_retries( - *, - oai: OpenAI, - query: str, - history: list[dict[str, Any]], - tools: list[Tool], - timeout_s: float, - capture_prompts: bool = False, -) -> tuple[dict[str, Any], list[dict[str, Any]] | None, dict[str, Any]]: - """包装 `_llm_decide_v2`:可重试错误最多 `CHATBI_V2_INTENT_LLM_RETRIES` 次,耗尽后抛 TimeoutError 走 V1。""" - max_retries = _intent_llm_max_retries() - last_retryable: BaseException | None = None - for attempt in range(1, max_retries + 1): - attempt_timeout_s = _intent_llm_timeout_s_for_attempt(timeout_s, attempt) - try: - raw_obj, intent_prompts = await _llm_decide_v2( - oai=oai, - query=query, - history=history, - tools=tools, - timeout_s=attempt_timeout_s, - capture_prompts=capture_prompts, - ) - meta: dict[str, Any] = {"attempt": attempt, "timeout_s": attempt_timeout_s} - if attempt > 1: - meta["used"] = "llm_retry" - _log_intent_retry_success( - attempt=attempt, max_retries=max_retries, timeout_s=attempt_timeout_s - ) - return raw_obj, intent_prompts, meta - except Exception as exc: # noqa: BLE001 - if not _intent_llm_retryable(exc) or attempt >= max_retries: - if _intent_llm_retryable(exc) and attempt >= max_retries: - _log_intent_retry_exhausted( - attempt=attempt, max_retries=max_retries, exc=exc - ) - raise asyncio.TimeoutError from exc - raise - last_retryable = exc - backoff_s = _intent_llm_retry_backoff_s(attempt) - _log_intent_retry_will_retry( - attempt=attempt, - max_retries=max_retries, - exc=exc, - timeout_s=attempt_timeout_s, - backoff_s=backoff_s, - ) - await asyncio.sleep(backoff_s) - raise asyncio.TimeoutError from last_retryable - - -def _heuristic_decide(query: str) -> tuple[ToolName, V1Mode, float, str]: - # 轻量启发式:用于 LLM 不可用时保证功能可用 - if is_text2sql_intent(query): - tool: ToolName = "text2sql_query" - mode: V1Mode = "text2sql" - conf = 0.72 - reasoning = "问题更像需要结构化统计/聚合数据,因此优先查询数据库。" - return tool, mode, conf, reasoning - - q = (query or "").lower() - if any(k in q for k in ["翻译", "润色", "改写", "写作", "总结", "概括", "提纲", "生成", "邮件", "周报", "brainstorm", "translate", "polish"]): - tool = "direct_answer" - mode = "no_data" - conf = 0.75 - reasoning = "问题更偏语言处理或创作,不需要检索/查库,直接生成回答。" - return tool, mode, conf, reasoning - - tool = "rag_search" - mode = "rag" - conf = 0.68 - reasoning = "问题更像需要参考内部资料的解释或信息检索,因此选择文档检索。" - return tool, mode, conf, reasoning - - -async def decide_intent_v2( - *, - query: str, - history: list[dict[str, Any]] | None = None, - tools: list[Tool] | None = None, - min_confidence: float = 0.6, - timeout: float = 3.0, - capture_llm_prompts: bool = False, -) -> IntentDecision: - t_start = time.perf_counter() - hist = history or [] - use_tools = tools or [] - - # structured_signals:gating 的关键依赖 - structured = StructuredSignals( - llm_prefers_sql=is_text2sql_intent(query), - has_aggregation_signals=_has_aggregation_keywords(query), - ) - - # 与 tests、benchmark、PROJECT_CONFIG 对齐:关闭时仅启发式/V1 降级,不创建 SiliconFlow client(CI 零外呼)。 - use_intent_llm_raw = (os.getenv("CHATBI_V2_INTENT_LLM", "true") or "").strip().lower() - use_intent_llm = use_intent_llm_raw in ("1", "true", "yes", "on") - - composite_key = _intent_composite_cache_key(query=query, history=hist) - key_obs = _cache_key_obs_hash(composite_key) - - def _latency_ms() -> int: - return int((time.perf_counter() - t_start) * 1000) - - def _return_cache_hit(base: IntentDecision) -> IntentDecision: - lat = _latency_ms() - out = _attach_cache_observability(base, cache="hit", cache_key_hash=key_obs, latency_ms=lat) - _log_intent_cache_line(event="hit", key_hash=key_obs, latency_ms=lat) - return out - - def _return_cache_miss(decision: IntentDecision) -> IntentDecision: - lat = _latency_ms() - _intent_cache.set(composite_key, _intent_decision_for_cache_store(decision)) - out = _attach_cache_observability(decision, cache="miss", cache_key_hash=key_obs, latency_ms=lat) - _log_intent_cache_line(event="miss", key_hash=key_obs, latency_ms=lat) - return out - - cached = _intent_cache.get(composite_key) - if isinstance(cached, IntentDecision): - return _return_cache_hit(cached) - - try: - if use_intent_llm: - # 显式开启:用 LLM 做主决策 - oai = openai_siliconflow_client() - if not use_tools: - # tool registry 为空时退化到启发式(避免空描述影响输出) - tool, mode, conf, reasoning = _heuristic_decide(query) - raw = {"used": "heuristic", "confidence": conf} - fallback = _fallback_tool_by_low_confidence(tool) if conf < min_confidence else None - decision = IntentDecision( - tool=tool, - mode=mode, - reasoning=reasoning, - reasoning_full=reasoning, - confidence=conf, - fallback=fallback, - structured_signals=structured, - raw_response=raw, - ) - return _return_cache_miss(decision) - - t_llm = _effective_intent_llm_timeout_s(timeout) - raw_obj, intent_prompts, retry_meta = await _llm_decide_v2_with_retries( - oai=oai, - query=query, - history=hist, - tools=use_tools, - timeout_s=t_llm, - capture_prompts=capture_llm_prompts, - ) - - tool_raw = raw_obj.get("tool") - reasoning_raw = raw_obj.get("reasoning") - confidence_raw = raw_obj.get("confidence") - tool = str(tool_raw or "").strip() - if tool not in ("rag_search", "text2sql_query", "direct_answer"): - raise ValueError("LLM intent tool 不在允许集合") - tool_t = tool # type: ignore[assignment] - mode = tool_mode_map()[tool_t] - conf = _clamp01(float(confidence_raw if confidence_raw is not None else 0.0)) - reasoning = str(reasoning_raw or "").strip() or "意图识别完成。" - fallback_tool = _fallback_tool_by_low_confidence(tool_t) if conf < min_confidence else None - raw_merged: dict[str, Any] = dict(raw_obj) - if retry_meta.get("used") == "llm_retry": - raw_merged["used"] = "llm_retry" - elif "used" not in raw_merged: - raw_merged["used"] = "llm" - if retry_meta.get("attempt") is not None: - raw_merged["attempt"] = retry_meta.get("attempt") - if retry_meta.get("timeout_s") is not None: - raw_merged["timeout_s"] = retry_meta.get("timeout_s") - if intent_prompts: - raw_merged["llm_prompts"] = intent_prompts - decision2 = IntentDecision( - tool=tool_t, - mode=mode, # type: ignore[arg-type] - reasoning=reasoning[:260], - reasoning_full=reasoning, - confidence=conf, - fallback=fallback_tool, - structured_signals=structured, - raw_response=raw_merged, - ) - decision2 = apply_hints_arbitration(decision2, query=query) - return _return_cache_miss(decision2) - except asyncio.TimeoutError: - # timeout -> 降级到 V1 规则路由 - v1 = decide_intent_v1(query=query, prefer="auto") - tool_t = _map_v1_mode_to_tool(v1.final_mode) - mode_t = tool_mode_map()[tool_t] - reasoning = "意图识别超时,降级到 V1 规则路由。" - conf = float(min_confidence) - fallback = None - decision3 = IntentDecision( - tool=tool_t, - mode=mode_t, # type: ignore[arg-type] - reasoning=reasoning[:260], - reasoning_full=reasoning, - confidence=_clamp01(conf), - fallback=fallback, - structured_signals=structured, - raw_response={"used": "v1_fallback", "confidence": conf}, - ) - return _return_cache_miss(decision3) - except Exception: - # LLM 失败/输出不符合预期 -> 启发式降级 - pass - - # 默认:启发式(保证可用性 + 运行成本可控) - tool_h, mode_h, conf_h, reasoning_h = _heuristic_decide(query) - fallback_tool_h = _fallback_tool_by_low_confidence(tool_h) if conf_h < min_confidence else None - # 还可追加一次:低置信度时向 V1 规则路由对齐(作为“超时回退”一致性) - if conf_h < min_confidence and use_tools: - try: - v1 = decide_intent_v1(query=query, prefer="auto") - tool_h = _map_v1_mode_to_tool(v1.final_mode) - mode_h = tool_mode_map()[tool_h] - fallback_tool_h = _fallback_tool_by_low_confidence(tool_h) if conf_h < min_confidence else None - except Exception: # noqa: BLE001 - pass - - decision4 = IntentDecision( - tool=tool_h, - mode=mode_h, - reasoning=reasoning_h, - reasoning_full=reasoning_h, - confidence=conf_h, - fallback=fallback_tool_h, - structured_signals=structured, - raw_response={"used": "heuristic", "confidence": conf_h}, - ) - return _return_cache_miss(decision4) - +from . import intent_llm as _intent_llm_mod # noqa: E402 + +LRUCache = _intent_llm_mod.LRUCache +_intent_cache = _intent_llm_mod._intent_cache +_llm_decide_v2 = _intent_llm_mod._llm_decide_v2 +_intent_llm_retryable = _intent_llm_mod._intent_llm_retryable +_intent_composite_cache_key = _intent_llm_mod._intent_composite_cache_key +clear_intent_cache = _intent_llm_mod.clear_intent_cache +compute_history_hash = _intent_llm_mod.compute_history_hash +decide_intent_v2 = _intent_llm_mod.decide_intent_v2 + +__all__ = [ + "IntentDecision", + "LRUCache", + "StructuredSignals", + "ToolName", + "V1Mode", + "apply_hints_arbitration", + "build_intent_path_obs", + "clear_intent_cache", + "compute_history_hash", + "decide_intent_v1", + "decide_intent_v2", + "openai_siliconflow_client", +] diff --git a/api/intent_llm.py b/api/intent_llm.py new file mode 100644 index 00000000..d317145a --- /dev/null +++ b/api/intent_llm.py @@ -0,0 +1,656 @@ +"""Intent V2:LRU 缓存、LLM 外呼与重试、`decide_intent_v2` 编排。""" + +from __future__ import annotations + +import asyncio +import hashlib +import json +import logging +import os +import re +import time +from collections import OrderedDict +from dataclasses import replace +from typing import TYPE_CHECKING, Any, Literal + +from openai import ( + APIConnectionError, + APIStatusError, + APITimeoutError, + InternalServerError, + OpenAI, + RateLimitError, +) + +from .intent_hints import build_intent_hints_prompt_block, load_resolved_hints +from .text2sql_core import is_text2sql_intent +from .tools import Tool, tool_mode_map + +if TYPE_CHECKING: + from .intent_agent import IntentDecision, ToolName, V1Mode + +_logger = logging.getLogger("api.intent_agent") + +_VOLATILE_RAW_KEYS = frozenset({"cache", "cache_key_hash", "latency_ms", "llm_prompts"}) + + +class LRUCache: + """IntentDecision 缓存:TTL 到期失效 + 超容量 LRU 淘汰。""" + + def __init__(self, *, maxsize: int, ttl_s: float) -> None: + self._maxsize = maxsize + self._ttl_s = ttl_s + self._items: OrderedDict[str, tuple[float, Any]] = OrderedDict() + + def clear(self) -> None: + self._items.clear() + + def get(self, key: str) -> Any | None: + now = time.time() + it = self._items.get(key) + if not it: + return None + expires_at, val = it + if expires_at <= now: + try: + del self._items[key] + except KeyError: + pass + return None + self._items.move_to_end(key) + return val + + def set(self, key: str, val: Any) -> None: + expires_at = time.time() + self._ttl_s + self._items[key] = (expires_at, val) + self._items.move_to_end(key) + while len(self._items) > self._maxsize: + self._items.popitem(last=False) + + +_intent_cache: LRUCache = LRUCache(maxsize=1000, ttl_s=300.0) + + +def _agent_cache() -> LRUCache: + from . import intent_agent as ia + + return ia._intent_cache + + +def clear_intent_cache() -> None: + """供基准脚本 / 测试做「冷启动」轮次清空缓存。""" + _agent_cache().clear() + + +def _debug_intent_cache_enabled() -> bool: + raw = (os.getenv("DEBUG_INTENT_CACHE", "") or "").strip().lower() + return raw in ("1", "true", "yes", "on") + + +def _intent_history_tail_for_hash(history: list[dict[str, Any]]) -> list[dict[str, str]]: + """最近 3 轮对话:最多 6 条 user/assistant 消息,仅 role + content。""" + tail = (history or [])[-6:] + out: list[dict[str, str]] = [] + for m in tail: + role = str(m.get("role", "") or "") + content = str(m.get("content", "") or "") + out.append({"role": role, "content": content}) + return out + + +def compute_history_hash(history: list[dict[str, Any]] | None) -> str: + """稳定 history 指纹:JSON sort_keys + sha256 前 16 hex。""" + normalized = _intent_history_tail_for_hash(history or []) + payload = json.dumps(normalized, sort_keys=True, ensure_ascii=False) + return hashlib.sha256(payload.encode("utf-8")).hexdigest()[:16] + + +def _intent_composite_cache_key(*, query: str, history: list[dict[str, Any]] | None) -> str: + """缓存主键:history_hash + query,不同 history 不会共用条目。""" + hh = compute_history_hash(history) + return f"{hh}\x1f{(query or '').strip()}" + + +def _cache_key_obs_hash(composite_key: str) -> str: + """可观测短哈希:不暴露 query 明文。""" + return hashlib.sha256(composite_key.encode("utf-8")).hexdigest()[:16] + + +def _raw_response_for_cache_store(raw: dict[str, Any]) -> dict[str, Any]: + """写入缓存前去掉本轮可观测字段与大段 prompt,避免污染命中副本。""" + return {k: v for k, v in raw.items() if k not in _VOLATILE_RAW_KEYS} + + +def _log_intent_cache_line(*, event: str, key_hash: str, latency_ms: int) -> None: + if not _debug_intent_cache_enabled(): + return + _logger.info("[intent-cache] %s key_hash=%s latency_ms=%s", event, key_hash, latency_ms) + + +def _exc_short_label(exc: BaseException) -> str: + return type(exc).__name__ + + +def _log_intent_retry_exhausted(*, attempt: int, max_retries: int, exc: BaseException) -> None: + _logger.warning( + "[intent-retry] exhausted attempt=%s max=%s err=%s → v1_fallback", + attempt, + max_retries, + _exc_short_label(exc), + ) + + +def _log_intent_retry_will_retry( + *, + attempt: int, + max_retries: int, + exc: BaseException, + timeout_s: float, + backoff_s: float, +) -> None: + _logger.warning( + "[intent-retry] attempt=%s/%s failed err=%s timeout_s=%s backoff_s=%s will_retry", + attempt, + max_retries, + _exc_short_label(exc), + timeout_s, + backoff_s, + ) + + +def _log_intent_retry_success(*, attempt: int, max_retries: int, timeout_s: float) -> None: + _logger.info( + "[intent-retry] success attempt=%s/%s timeout_s=%s", + attempt, + max_retries, + timeout_s, + ) + + +def _intent_decision_for_cache_store(d: IntentDecision) -> IntentDecision: + return replace(d, raw_response=_raw_response_for_cache_store(dict(d.raw_response))) + + +def _attach_cache_observability( + d: IntentDecision, + *, + cache: Literal["hit", "miss"], + cache_key_hash: str, + latency_ms: int, +) -> IntentDecision: + merged = {**dict(d.raw_response), "cache": cache, "cache_key_hash": cache_key_hash, "latency_ms": latency_ms} + return replace(d, raw_response=merged) + + +def _clamp01(x: float) -> float: + if x < 0.0: + return 0.0 + if x > 1.0: + return 1.0 + return x + + +def _has_aggregation_keywords(query: str) -> bool: + q = (query or "").lower() + needles = ( + "多少", + "金额", + "收入", + "支出", + "人数", + "数量", + "总数", + "平均", + "最大", + "最小", + "top", + "排行", + "排名", + "趋势", + "对比", + "分组", + "group by", + "count", + "sum", + "avg", + ) + return any(n in q for n in needles) + + +def _fallback_tool_by_low_confidence(tool: ToolName) -> ToolName: + if tool == "text2sql_query": + return "rag_search" + if tool == "rag_search": + return "direct_answer" + return "rag_search" + + +def _map_v1_mode_to_tool(mode: str) -> ToolName: + m = (mode or "").strip().lower() + if m == "text2sql": + return "text2sql_query" + if m == "rag": + return "rag_search" + return "direct_answer" + + +def _extract_json_obj(text: str) -> dict[str, Any] | None: + s = text or "" + m = re.search(r"\{[\s\S]*\}", s) + if not m: + return None + raw = m.group(0) + try: + obj = json.loads(raw) + except Exception: # noqa: BLE001 + return None + if isinstance(obj, dict): + return obj + return None + + +async def _llm_decide_v2( + *, + oai: OpenAI, + query: str, + history: list[dict[str, Any]], + tools: list[Tool], + timeout_s: float, + capture_prompts: bool = False, +) -> tuple[dict[str, Any], list[dict[str, Any]] | None]: + tools_desc = "\n\n".join([f"- {t.name}: {t.description}" for t in tools]) + history_block = "\n".join( + [f"{m.get('role', '?')}: {str(m.get('content', ''))[:200]}" for m in (history or [])[-6:]] + ).strip() or "无历史对话" + + hints_block = build_intent_hints_prompt_block(load_resolved_hints()).strip() + hints_section = f"\n\n{hints_block}\n" if hints_block else "" + + prompt = f"""你是 ChatBI V2 的意图识别器:在下列工具中选**恰好一个**,用于本仓库/本产品的对话路由(评测集亦按此口径)。 + +## 可用工具(描述以注册表为准) +{tools_desc} + +## 总原则 + +- **text2sql_query**:用户要**本库业务数据**的具体数值/排名/趋势/分组统计,且应由数据库查询给出答案。 +- **rag_search**:需要**项目内文档、规范、任务单、架构说明、评测口径、错误码约定、实现细节**等;或问题明显落在「本仓库怎么说/怎么做」而非百科通识一句带过。 +- **direct_answer**:**不依赖**内部文档即可完成——翻译、润色、创作、头脑风暴、纯算法题/语法教学、与当前产品/仓库无关的通识科普等。 +{hints_section} +## 「通用知识」vs「须查资料」(易错点) + +下列主题若**未**明确说「只要高中数学定义、不要项目文档」,在本产品中默认走 **rag_search**(便于对齐内部文档与评测口径): + +- 指标与评测:**macro-F1**、**confusion matrix**、准确率/Precision/Recall、分桶统计等; +- 工程约束:**CI**、**stub**、零外呼门禁、**P50/P95** 基准写法; +- 仓库与规范:**_tech_graph**、**intent_router**、**Supabase** 在本项目中的错误处理约定等。 + +若用户只要**与项目无关**的通识(例:「用通俗语言解释量子计算」),选 **direct_answer**。 + +## 多轮对话 + +- 必须结合 **历史对话** 做指代消解:「它/那/这个」继承上文主题。 +- 若上文在讨论 **Text2SQL / 销售额 / 查库**,本轮问「**要不要查数据库**」「**是否要走 SQL**」「路由边界」等——属于**产品能力/路由说明**,选 **rag_search**(查文档说明),**不要**因字面像常识而选 direct_answer。 +- 若上文是写作/翻译/生成示例代码,本轮续写、改写语气、再要例子——多为 **direct_answer**。 + +## 与 text2sql 的边界 + +| 用户问题 | 选择 | +|---------|------| +| 「昨天销售额是多少」 | **text2sql_query**(要真实数据) | +| 「怎么统计 heros 表」且明显教写法、不要执行 | **direct_answer** | +| 「heros 表有哪些字段」 | **rag_search** | + +## Few-shot(短) + +Q: 昨天销售额是多少? +{{"tool": "text2sql_query", "reasoning": "需要查库得到金额", "confidence": 0.95}} + +Q: 如何计算 confusion matrix +{{"tool": "rag_search", "reasoning": "评测/文档口径,宜检索项目内说明", "confidence": 0.9}} + +Q: 解释一下量子计算,用通俗语言 +{{"tool": "direct_answer", "reasoning": "与仓库无关的通识解释", "confidence": 0.88}} + +[历史] +user: 昨天销售额是多少 +assistant: 我可以通过 Text2SQL 去查询…… +user: 那需要查数据库吗 +{{"tool": "rag_search", "reasoning": "结合上文仍在谈查数路由,应查文档说明是否走库", "confidence": 0.86}} + +## 历史对话 +{history_block} + +## 当前用户问题 +{query} + +## 输出 +仅输出一个 JSON 对象,勿其它文字: +{{ + "tool": "rag_search | text2sql_query | direct_answer", + "reasoning": "用户可见的 1-2 句摘要", + "confidence": 0.0-1.0 +}} +""" + + intent_model = os.getenv("INTENT_LLM_MODEL", "deepseek-ai/DeepSeek-V4-Pro") + intent_messages: list[dict[str, str]] = [ + { + "role": "system", + "content": ( + "你是严谨的意图分类器。仅从工具名集合中选择;" + "只输出一个 JSON 对象,不要 Markdown、不要前后缀说明。" + ), + }, + {"role": "user", "content": prompt}, + ] + + def _sync_call() -> str: + res = oai.chat.completions.create( + model=intent_model, + messages=intent_messages, + temperature=0.0, + stream=False, + ) + return (res.choices[0].message.content or "").strip() + + text = await asyncio.wait_for(asyncio.to_thread(_sync_call), timeout=timeout_s) + obj = _extract_json_obj(text) + if obj is None: + raise ValueError("LLM intent 输出不是合法 JSON") + prompts: list[dict[str, Any]] | None = None + if capture_prompts: + prompts = [{"phase": "intent", "model": intent_model, "messages": intent_messages}] + return obj, prompts + + +def _effective_intent_llm_timeout_s(override: float) -> float: + """Intent LLM 单次 `wait_for` 上限:优先读 `CHATBI_V2_INTENT_TIMEOUT_S`,否则用调用方传入值。""" + raw = (os.getenv("CHATBI_V2_INTENT_TIMEOUT_S") or "").strip() + if raw: + try: + return max(0.5, min(120.0, float(raw))) + except ValueError: + pass + return max(0.5, min(120.0, float(override))) + + +def _intent_llm_max_retries() -> int: + raw = (os.getenv("CHATBI_V2_INTENT_LLM_RETRIES") or "").strip() + if raw: + try: + return max(1, min(5, int(raw))) + except ValueError: + pass + return 3 + + +def _intent_llm_retry_backoff_s(attempt: int) -> float: + base = 0.15 + raw = (os.getenv("CHATBI_V2_INTENT_LLM_RETRY_BACKOFF_S") or "").strip() + if raw: + try: + base = max(0.0, min(5.0, float(raw))) + except ValueError: + pass + return base * (2 ** max(0, attempt - 1)) + + +def _intent_llm_retry_timeout_factors() -> tuple[float, ...]: + raw = (os.getenv("CHATBI_V2_INTENT_RETRY_TIMEOUT_FACTORS") or "").strip() + if raw: + try: + parsed = tuple(max(0.1, min(1.0, float(x.strip()))) for x in raw.split(",") if x.strip()) + if parsed: + return parsed + except ValueError: + pass + return (1.0, 0.65, 0.4) + + +def _intent_llm_timeout_s_for_attempt(base_timeout_s: float, attempt: int) -> float: + factors = _intent_llm_retry_timeout_factors() + idx = min(max(attempt, 1) - 1, len(factors) - 1) + scaled = base_timeout_s * factors[idx] + return max(0.5, min(120.0, scaled)) + + +def _intent_llm_retryable(exc: BaseException) -> bool: + if isinstance(exc, (asyncio.TimeoutError, APITimeoutError)): + return True + if isinstance(exc, (APIConnectionError, InternalServerError, RateLimitError)): + return True + if isinstance(exc, APIStatusError): + code = getattr(exc, "status_code", None) + return code in (429, 502, 503, 504) + return False + + +async def _llm_decide_v2_with_retries( + *, + oai: OpenAI, + query: str, + history: list[dict[str, Any]], + tools: list[Tool], + timeout_s: float, + capture_prompts: bool = False, +) -> tuple[dict[str, Any], list[dict[str, Any]] | None, dict[str, Any]]: + """包装 `_llm_decide_v2`:可重试错误最多 `CHATBI_V2_INTENT_LLM_RETRIES` 次,耗尽后抛 TimeoutError 走 V1。""" + from . import intent_agent as ia + + max_retries = _intent_llm_max_retries() + last_retryable: BaseException | None = None + for attempt in range(1, max_retries + 1): + attempt_timeout_s = _intent_llm_timeout_s_for_attempt(timeout_s, attempt) + try: + raw_obj, intent_prompts = await ia._llm_decide_v2( + oai=oai, + query=query, + history=history, + tools=tools, + timeout_s=attempt_timeout_s, + capture_prompts=capture_prompts, + ) + meta: dict[str, Any] = {"attempt": attempt, "timeout_s": attempt_timeout_s} + if attempt > 1: + meta["used"] = "llm_retry" + _log_intent_retry_success( + attempt=attempt, max_retries=max_retries, timeout_s=attempt_timeout_s + ) + return raw_obj, intent_prompts, meta + except Exception as exc: # noqa: BLE001 + if not _intent_llm_retryable(exc) or attempt >= max_retries: + if _intent_llm_retryable(exc) and attempt >= max_retries: + _log_intent_retry_exhausted( + attempt=attempt, max_retries=max_retries, exc=exc + ) + raise asyncio.TimeoutError from exc + raise + last_retryable = exc + backoff_s = _intent_llm_retry_backoff_s(attempt) + _log_intent_retry_will_retry( + attempt=attempt, + max_retries=max_retries, + exc=exc, + timeout_s=attempt_timeout_s, + backoff_s=backoff_s, + ) + await asyncio.sleep(backoff_s) + raise asyncio.TimeoutError from last_retryable + + +def _heuristic_decide(query: str) -> tuple[ToolName, V1Mode, float, str]: + if is_text2sql_intent(query): + tool: ToolName = "text2sql_query" + mode: V1Mode = "text2sql" + conf = 0.72 + reasoning = "问题更像需要结构化统计/聚合数据,因此优先查询数据库。" + return tool, mode, conf, reasoning + + q = (query or "").lower() + if any(k in q for k in ["翻译", "润色", "改写", "写作", "总结", "概括", "提纲", "生成", "邮件", "周报", "brainstorm", "translate", "polish"]): + tool = "direct_answer" + mode = "no_data" + conf = 0.75 + reasoning = "问题更偏语言处理或创作,不需要检索/查库,直接生成回答。" + return tool, mode, conf, reasoning + + tool = "rag_search" + mode = "rag" + conf = 0.68 + reasoning = "问题更像需要参考内部资料的解释或信息检索,因此选择文档检索。" + return tool, mode, conf, reasoning + + +async def decide_intent_v2( + *, + query: str, + history: list[dict[str, Any]] | None = None, + tools: list[Tool] | None = None, + min_confidence: float = 0.6, + timeout: float = 3.0, + capture_llm_prompts: bool = False, +) -> IntentDecision: + from . import intent_agent as ia + + t_start = time.perf_counter() + hist = history or [] + use_tools = tools or [] + + structured = ia.StructuredSignals( + llm_prefers_sql=is_text2sql_intent(query), + has_aggregation_signals=_has_aggregation_keywords(query), + ) + + use_intent_llm_raw = (os.getenv("CHATBI_V2_INTENT_LLM", "true") or "").strip().lower() + use_intent_llm = use_intent_llm_raw in ("1", "true", "yes", "on") + + composite_key = _intent_composite_cache_key(query=query, history=hist) + key_obs = _cache_key_obs_hash(composite_key) + + def _latency_ms() -> int: + return int((time.perf_counter() - t_start) * 1000) + + def _return_cache_hit(base: IntentDecision) -> IntentDecision: + lat = _latency_ms() + out = _attach_cache_observability(base, cache="hit", cache_key_hash=key_obs, latency_ms=lat) + _log_intent_cache_line(event="hit", key_hash=key_obs, latency_ms=lat) + return out + + def _return_cache_miss(decision: IntentDecision) -> IntentDecision: + lat = _latency_ms() + _agent_cache().set(composite_key, _intent_decision_for_cache_store(decision)) + out = _attach_cache_observability(decision, cache="miss", cache_key_hash=key_obs, latency_ms=lat) + _log_intent_cache_line(event="miss", key_hash=key_obs, latency_ms=lat) + return out + + cached = _agent_cache().get(composite_key) + if isinstance(cached, ia.IntentDecision): + return _return_cache_hit(cached) + + try: + if use_intent_llm: + oai = ia.openai_siliconflow_client() + if not use_tools: + tool, mode, conf, reasoning = _heuristic_decide(query) + raw = {"used": "heuristic", "confidence": conf} + fallback = _fallback_tool_by_low_confidence(tool) if conf < min_confidence else None + decision = ia.IntentDecision( + tool=tool, + mode=mode, + reasoning=reasoning, + reasoning_full=reasoning, + confidence=conf, + fallback=fallback, + structured_signals=structured, + raw_response=raw, + ) + return _return_cache_miss(decision) + + t_llm = _effective_intent_llm_timeout_s(timeout) + raw_obj, intent_prompts, retry_meta = await _llm_decide_v2_with_retries( + oai=oai, + query=query, + history=hist, + tools=use_tools, + timeout_s=t_llm, + capture_prompts=capture_llm_prompts, + ) + + tool_raw = raw_obj.get("tool") + reasoning_raw = raw_obj.get("reasoning") + confidence_raw = raw_obj.get("confidence") + tool = str(tool_raw or "").strip() + if tool not in ("rag_search", "text2sql_query", "direct_answer"): + raise ValueError("LLM intent tool 不在允许集合") + tool_t = tool # type: ignore[assignment] + mode = tool_mode_map()[tool_t] + conf = _clamp01(float(confidence_raw if confidence_raw is not None else 0.0)) + reasoning = str(reasoning_raw or "").strip() or "意图识别完成。" + fallback_tool = _fallback_tool_by_low_confidence(tool_t) if conf < min_confidence else None + raw_merged: dict[str, Any] = dict(raw_obj) + if retry_meta.get("used") == "llm_retry": + raw_merged["used"] = "llm_retry" + elif "used" not in raw_merged: + raw_merged["used"] = "llm" + if retry_meta.get("attempt") is not None: + raw_merged["attempt"] = retry_meta.get("attempt") + if retry_meta.get("timeout_s") is not None: + raw_merged["timeout_s"] = retry_meta.get("timeout_s") + if intent_prompts: + raw_merged["llm_prompts"] = intent_prompts + decision2 = ia.IntentDecision( + tool=tool_t, + mode=mode, # type: ignore[arg-type] + reasoning=reasoning[:260], + reasoning_full=reasoning, + confidence=conf, + fallback=fallback_tool, + structured_signals=structured, + raw_response=raw_merged, + ) + decision2 = ia.apply_hints_arbitration(decision2, query=query) + return _return_cache_miss(decision2) + except asyncio.TimeoutError: + v1 = ia.decide_intent_v1(query=query, prefer="auto") + tool_t = _map_v1_mode_to_tool(v1.final_mode) + mode_t = tool_mode_map()[tool_t] + reasoning = "意图识别超时,降级到 V1 规则路由。" + conf = float(min_confidence) + fallback = None + decision3 = ia.IntentDecision( + tool=tool_t, + mode=mode_t, # type: ignore[arg-type] + reasoning=reasoning[:260], + reasoning_full=reasoning, + confidence=_clamp01(conf), + fallback=fallback, + structured_signals=structured, + raw_response={"used": "v1_fallback", "confidence": conf}, + ) + return _return_cache_miss(decision3) + except Exception: + pass + + tool_h, mode_h, conf_h, reasoning_h = _heuristic_decide(query) + fallback_tool_h = _fallback_tool_by_low_confidence(tool_h) if conf_h < min_confidence else None + if conf_h < min_confidence and use_tools: + try: + v1 = ia.decide_intent_v1(query=query, prefer="auto") + tool_h = _map_v1_mode_to_tool(v1.final_mode) + mode_h = tool_mode_map()[tool_h] + fallback_tool_h = _fallback_tool_by_low_confidence(tool_h) if conf_h < min_confidence else None + except Exception: # noqa: BLE001 + pass + + decision4 = ia.IntentDecision( + tool=tool_h, + mode=mode_h, + reasoning=reasoning_h, + reasoning_full=reasoning_h, + confidence=conf_h, + fallback=fallback_tool_h, + structured_signals=structured, + raw_response={"used": "heuristic", "confidence": conf_h}, + ) + return _return_cache_miss(decision4) diff --git a/api/intent_router.py b/api/intent_router.py index 18780e9d..88b8929e 100644 --- a/api/intent_router.py +++ b/api/intent_router.py @@ -1,11 +1,16 @@ from __future__ import annotations import os -import re from dataclasses import dataclass from typing import Any, Literal -from .intent_hints import load_resolved_hints, rag_rule_hits_from_hints +from .intent_router_rules import ( + is_safe_count_query, + no_data_rule_hits, + rag_rule_hits, + sql_rule_hits, + tool_rule_hits, +) from .rag_env import supabase_client from .text2sql_store import get_text2sql_store @@ -22,172 +27,6 @@ class RouterDecision: fallback: str | None -def _contains_any(text: str, needles: list[str]) -> bool: - t = (text or "").lower() - return any(n.lower() in t for n in needles) - - -def _rag_rule_hits(query: str) -> list[str]: - """偏「读仓库内文档 / 日记 / 任务说明」的轻量规则,用于在 auto 下优先走 RAG 候选。""" - hits: list[str] = [] - q = (query or "").strip() - if not q: - return hits - - rag_kw = [ - "日记", - "diary", - "markdown", - ".md", - "任务单", - "规范", - "架构说明", - "_tech_graph", - "文档", - "文章", - "博客", - "写了什么", - "内容是什么", - "讲了什么", - "说了什么", - "摘录", - "这篇", - "那篇", - "哪篇", - "readme", - ] - if _contains_any(q, rag_kw): - hits.append("rule:rag_keywords") - - # 任务编号 / 章节式引用(偏文档检索) - if re.search(r"\btask\s*\d+", q, re.IGNORECASE): - hits.append("rule:task_ref_hint") - - # 日期 + 文档形态(如 2026-04-28.md、某天的日记) - if re.search(r"\d{4}-\d{2}-\d{2}", q) and (".md" in q.lower() or "日记" in q or "diary" in q.lower()): - hits.append("rule:date_doc_hint") - - if "content/" in q.lower() or "task_" in q.lower(): - hits.append("rule:repo_path_hint") - - try: - hints = load_resolved_hints() - if hints: - for h in rag_rule_hits_from_hints(q, hints): - if h not in hits: - hits.append(h) - except Exception: # noqa: BLE001 - pass - - return hits - - -def _no_data_rule_hits(query: str) -> list[str]: - hits: list[str] = [] - q = (query or "").strip() - if not q: - return hits - no_data_kw = [ - "润色", - "改写", - "翻译", - "写作", - "起标题", - "总结", - "概括", - "生成", - "提纲", - "头脑风暴", - "邮件", - "周报", - "改成更正式", - "rewrite", - "polish", - "translate", - "brainstorm", - ] - if _contains_any(q, no_data_kw): - hits.append("rule:no_data_keywords") - return hits - - -def _sql_rule_hits(query: str) -> list[str]: - hits: list[str] = [] - q = (query or "").strip() - if not q: - return hits - sql_kw = [ - "查询", - "统计", - "多少", - "金额", - "人数", - "数量", - "总数", - "平均", - "最大", - "最小", - "top", - "排行", - "分组", - "汇总", - "group by", - "count", - "sum", - "avg", - ] - if _contains_any(q, sql_kw): - hits.append("rule:sql_keywords") - # 明确提表名/字段名的倾向(snake_case) - if re.search(r"\b[a-z][a-z0-9_]{2,}\b", q): - hits.append("rule:identifier_hint") - return hits - - -def _is_safe_count_query(query: str) -> bool: - """是否属于“安全 COUNT 查询”。 - - 目标:避免在 DDL evidence=0 时把明确的计数类查询误降级到 no_data。 - 约束:仅放行“计数/总数”这一类不依赖列信息的查询意图。 - """ - q = (query or "").strip().lower() - if not q: - return False - - # 必须出现“表”与一个可疑标识符(表名倾向) - has_table_word = "表" in q - has_identifier_hint = re.search(r"\b[a-z][a-z0-9_]{2,}\b", q) is not None - if not (has_table_word and has_identifier_hint): - return False - - # 必须是计数语义(不要求精确关键词匹配,但至少要有明显的 count/总数/多少条) - count_needles = ( - "count", - "总数", - "多少条", - "多少行", - "有多少", - "多少条数据", - "条数据", - "记录数", - "条记录", - ) - if any(n in q for n in count_needles): - return True - - # “统计 + 表 + 多少/条”也视为计数类(中文口语常见) - if "统计" in q and ("多少" in q or "条" in q): - return True - - return False - - -def _tool_rule_hits(query: str) -> list[str]: - # v1 仅预留,返回空 - _ = query - return [] - - def _ddl_evidence(query: str) -> tuple[int, float | None]: """Text2SQL evidence:DDL 命中数与 top1 分数(若可得)。""" topk = int(os.getenv("INTENT_DDL_EVIDENCE_TOPK", "3")) @@ -252,16 +91,15 @@ def decide_intent(*, query: str, prefer: str) -> RouterDecision: ) rule_hits: list[str] = [] - tool_hits = _tool_rule_hits(query) - rag_hits = _rag_rule_hits(query) - sql_hits = _sql_rule_hits(query) - nodata_hits = _no_data_rule_hits(query) + tool_hits = tool_rule_hits(query) + rag_hits = rag_rule_hits(query) + sql_hits = sql_rule_hits(query) + nodata_hits = no_data_rule_hits(query) rule_hits.extend(tool_hits) rule_hits.extend(rag_hits) rule_hits.extend(sql_hits) rule_hits.extend(nodata_hits) - # candidate selection(rag 信号优先于纯 SQL 关键词 / no_data 创作词,减少「查日记」误成 text2sql) candidate: str if tool_hits: candidate = "tool:unknown" @@ -277,7 +115,6 @@ def decide_intent(*, query: str, prefer: str) -> RouterDecision: evidence: dict[str, Any] = {} fallback: str | None = None - # evidence check ddl_hits = 0 ddl_top: float | None = None fts_hits = 0 @@ -304,13 +141,10 @@ def decide_intent(*, query: str, prefer: str) -> RouterDecision: final_mode = candidate - # protect: sql needs ddl evidence if candidate == "text2sql" and ddl_hits <= 0: - # 特判:明确的“统计某表有多少条”属于安全 COUNT 查询,不依赖列信息,允许直接走 text2sql。 - if _is_safe_count_query(query): + if is_safe_count_query(query): final_mode = "text2sql" fallback = "text2sql_without_ddl_allowed_safe_count" - # fall back: prefer rag if fts has evidence else no_data elif fts_hits > 0: final_mode = "rag" fallback = "text2sql_without_ddl→rag" @@ -318,7 +152,6 @@ def decide_intent(*, query: str, prefer: str) -> RouterDecision: final_mode = "no_data" fallback = "text2sql_without_ddl→no_data" - # protect: rag 在 FTS 无命中时,仅当 DDL 与 FTS 两侧都无有效信号且无 rag 规则命中时再降为 no_data if final_mode == "rag" and fts_hits <= 0: if ddl_hits > 0 and sql_hits: final_mode = "text2sql" @@ -342,4 +175,3 @@ def decide_intent(*, query: str, prefer: str) -> RouterDecision: evidence=evidence, fallback=fallback, ) - diff --git a/api/intent_router_rules.py b/api/intent_router_rules.py new file mode 100644 index 00000000..e53061db --- /dev/null +++ b/api/intent_router_rules.py @@ -0,0 +1,166 @@ +"""V1 意图路由:表驱动规则命中(keyword / regex / hints)。""" + +from __future__ import annotations + +import re + +from .intent_hints import load_resolved_hints, rag_rule_hits_from_hints + + +def _contains_any(text: str, needles: list[str]) -> bool: + t = (text or "").lower() + return any(n.lower() in t for n in needles) + + +def rag_rule_hits(query: str) -> list[str]: + """偏「读仓库内文档 / 日记 / 任务说明」的轻量规则,用于在 auto 下优先走 RAG 候选。""" + hits: list[str] = [] + q = (query or "").strip() + if not q: + return hits + + rag_kw = [ + "日记", + "diary", + "markdown", + ".md", + "任务单", + "规范", + "架构说明", + "_tech_graph", + "文档", + "文章", + "博客", + "写了什么", + "内容是什么", + "讲了什么", + "说了什么", + "摘录", + "这篇", + "那篇", + "哪篇", + "readme", + ] + if _contains_any(q, rag_kw): + hits.append("rule:rag_keywords") + + if re.search(r"\btask\s*\d+", q, re.IGNORECASE): + hits.append("rule:task_ref_hint") + + if re.search(r"\d{4}-\d{2}-\d{2}", q) and (".md" in q.lower() or "日记" in q or "diary" in q.lower()): + hits.append("rule:date_doc_hint") + + if "content/" in q.lower() or "task_" in q.lower(): + hits.append("rule:repo_path_hint") + + try: + hints = load_resolved_hints() + if hints: + for h in rag_rule_hits_from_hints(q, hints): + if h not in hits: + hits.append(h) + except Exception: # noqa: BLE001 + pass + + return hits + + +def no_data_rule_hits(query: str) -> list[str]: + hits: list[str] = [] + q = (query or "").strip() + if not q: + return hits + no_data_kw = [ + "润色", + "改写", + "翻译", + "写作", + "起标题", + "总结", + "概括", + "生成", + "提纲", + "头脑风暴", + "邮件", + "周报", + "改成更正式", + "rewrite", + "polish", + "translate", + "brainstorm", + ] + if _contains_any(q, no_data_kw): + hits.append("rule:no_data_keywords") + return hits + + +def sql_rule_hits(query: str) -> list[str]: + hits: list[str] = [] + q = (query or "").strip() + if not q: + return hits + sql_kw = [ + "查询", + "统计", + "多少", + "金额", + "人数", + "数量", + "总数", + "平均", + "最大", + "最小", + "top", + "排行", + "分组", + "汇总", + "group by", + "count", + "sum", + "avg", + ] + if _contains_any(q, sql_kw): + hits.append("rule:sql_keywords") + if re.search(r"\b[a-z][a-z0-9_]{2,}\b", q): + hits.append("rule:identifier_hint") + return hits + + +def is_safe_count_query(query: str) -> bool: + """是否属于“安全 COUNT 查询”。 + + 目标:避免在 DDL evidence=0 时把明确的计数类查询误降级到 no_data。 + 约束:仅放行“计数/总数”这一类不依赖列信息的查询意图。 + """ + q = (query or "").strip().lower() + if not q: + return False + + has_table_word = "表" in q + has_identifier_hint = re.search(r"\b[a-z][a-z0-9_]{2,}\b", q) is not None + if not (has_table_word and has_identifier_hint): + return False + + count_needles = ( + "count", + "总数", + "多少条", + "多少行", + "有多少", + "多少条数据", + "条数据", + "记录数", + "条记录", + ) + if any(n in q for n in count_needles): + return True + + if "统计" in q and ("多少" in q or "条" in q): + return True + + return False + + +def tool_rule_hits(query: str) -> list[str]: + _ = query + return [] diff --git a/docs/tasks/active/task_api_intent_stack_split_w8.md b/docs/tasks/active/task_api_intent_stack_split_w8.md new file mode 100644 index 00000000..dab9507d --- /dev/null +++ b/docs/tasks/active/task_api_intent_stack_split_w8.md @@ -0,0 +1,81 @@ +> **epic**: `standards-engineering/api-modularization` +> **manifest_ref**: W8 · task_standards_backend_api_modularization_manifest_v1.md +> **test_strategy**: `required` +> **非范围**: MANIFEST 表内未列出的 `api/*.py` 文件 + +--- + +# W8 · Intent 栈拆分 + +> **状态**: active +> **slug**: `api-intent-stack-split` +> **git_branch**: `task/api-intent-w8` +> **风险**: Medium +> **freeze_id**: `CODING_BACKEND_L2@2026-06-09` + +--- + +## Harness 元信息 + +| 字段 | 值 | +|------|-----| +| **task_slug** | `api-intent-stack-split` | +| **git_branch** | `task/api-intent-w8` | +| **orchestration** | Cursor Task 链 | +| **test_strategy** | `required` | +| **freeze_id** | `CODING_BACKEND_L2@2026-06-09` | + +--- + +## 目标 + +将 `intent_agent.py` / `intent_router.py` 中表驱动规则与 LLM 路径分文件;公开 import 不变。 + +### 下沉范围 + +| 模块 | 说明 | +|------|------| +| `api/intent_router_rules.py` | V1 表驱动规则(rag/sql/no_data keyword 命中) | +| `api/intent_llm.py` | LRU 缓存、LLM 外呼/重试、`decide_intent_v2` | + +--- + +## 行为变更(Delta) + +### ADDED +- `api/intent_router_rules.py` · `api/intent_llm.py` + +### MODIFIED +- `api/intent_agent.py` — 模型 + hints 仲裁 + re-export +- `api/intent_router.py` — 保留 `decide_intent()` 入口,规则下沉 + +### 不变 +- `from api.intent_agent import decide_intent_v2, IntentDecision, ...` +- `from api.intent_router import decide_intent` +- `api/agent.py` 须保留 `decide_intent_v1` import 绑定 + +--- + +## 失败路径 + +| # | Scenario ID | 触发 | 行为 | +|---|-------------|------|------| +| F1 | fp-intent-split-break | 拆分破坏 intent 路由/缓存/重试 | pytest 阻塞 merge | + +--- + +## 验收标准 + +- [x] 子模块存在且 ruff 绿 +- [x] `decide_intent()` 仍在 `api/intent_router.py` +- [x] `decide_intent_v2()` 仍可从 `api/intent_agent` import +- [x] `pytest tests -m "not intent_eval and not intent_benchmark"` 全绿 +- [x] 单 PR 触及 `api/*.py` ≤8(本 PR:**4**) + +--- + +## 修订记录 + +| 日期 | 说明 | +|------|------| +| 2026-06-09 | v1:W8 实现 |