From fe5307bf4d7d269769f1a566e7c56e220af093bf Mon Sep 17 00:00:00 2001 From: Agent Date: Mon, 18 May 2026 12:18:59 +0800 Subject: [PATCH 1/3] feat: add tri-axis scan SOP --- memory/tri_axis_scan_sop.md | 460 ++++++++++++++++++++++++++++++++++++ 1 file changed, 460 insertions(+) create mode 100644 memory/tri_axis_scan_sop.md diff --git a/memory/tri_axis_scan_sop.md b/memory/tri_axis_scan_sop.md new file mode 100644 index 000000000..e0d7e3b2b --- /dev/null +++ b/memory/tri_axis_scan_sop.md @@ -0,0 +1,460 @@ +# 三轴扫描 SOP (tri_axis_scan_sop) + +## 触发条件 +定时任务触发(scheduler注入prompt),或用户手动要求"执行三轴扫描"。 + +## 关键路径(相对于项目根) +``` +数据源: memory/L4_raw_sessions/all_user_histories.txt +状态文件: reflect/analyzers/scan_state.json +中间产物: reflect/analyzers/_scan_results.json +归一化映射: reflect/analyzers/_normalize_map.json +最终报告: reflect/analyzers/scan_report.json +活动矩阵: reflect/analyzers/activity_matrix.json +``` + +--- + +## Phase 0: 数据准备(code_run) + +用code_run执行以下逻辑: +1. 读取 `scan_state.json`,获取 `emotion_last_line`(增量起点,首次为0) +2. 读取 `all_user_histories.txt`,按 `SESSION:` 头分割 +3. 提取每个session中的 `[USER]:` 行(忽略长度≤5的) +4. 只保留行号 > emotion_last_line 的新增行 +5. 分批:每批最多120行(按session为单位装入批次) +6. 将批次列表写入 `_batches.json`(JSON数组,每个元素=[{session, lines:[[行号,文本],...]}]) +7. 输出批次数量。若为0则无新数据,直接跳到Phase 4读取现有报告并结束 +8. 启动 task_monitor 后台进程(自动续发保护,防止Agent自停导致任务中断): + +```python +import subprocess, sys, os + +# 启动monitor后台进程 +monitor_script = "reflect/task_monitor.py" +task_name = "tri_axis_scan" # 与task目录名一致 +python_exe = sys.executable.replace('pythonw.exe', 'python.exe') + +proc = subprocess.Popen( + [python_exe, monitor_script, task_name, + "--complete-marker", "[TRI_AXIS_SCAN_COMPLETE]", + "--max-replies", "5", + "--interval", "15"], + creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0, + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL +) +print(f"Monitor started (PID={proc.pid}), will auto-reply if agent stalls") +``` + +--- +## Phase 1: 情绪+活动提取(code_run循环驱动,严格逐批) + +⛔ **核心约束(防止质量退化,已验证的失败教训):** +- **每轮只处理1批** —— 由code_run控制数据输出,Agent不可自行读取多批 +- 每轮固定结构:`code_run(读第N批)` → Agent分析 → `code_run(写入结果)` +- **禁止为了效率一次处理多批** —— 这会导致后半段数据编造,是已验证的失败模式 +- 分析必须基于当前轮code_run输出的实际文本,禁止凭记忆/概括 +- 49批就是49轮循环,不可压缩。慢是正常的,质量优先 + +### 步骤A: code_run读取当前批次 + +每轮开始时执行code_run,读取并打印第N批文本: + +```python +import json, os + +batches_file = "reflect/analyzers/_batches.json" +results_file = "reflect/analyzers/_scan_results.json" + +with open(batches_file, 'r', encoding='utf-8') as f: + batches = json.load(f) + +# 确定当前批次(断点恢复:检查已完成的批次数) +done = 0 +if os.path.exists(results_file): + with open(results_file, 'r', encoding='utf-8') as f: + done = len(json.load(f)) + +if done >= len(batches): + print("ALL_DONE: 所有批次已处理完毕,进入Phase 2") +else: + batch = batches[done] + print(f"=== 批次 {done}/{len(batches)} ===") + for item in batch: + print(f"\n--- SESSION: {item['session']} ---") + for line_no, text in item['lines']: + print(f"[{line_no}] {text}") + print(f"\n--- END BATCH {done} ---") +``` + +### 步骤B: Agent分析(严格按以下规则执行) + +阅读步骤A输出的文本,按以下规则分析: + +**情绪检测规则(只找出用户情绪强烈爆发的瞬间):** + +仅标记以下情况: +- 累积不满后的爆发(连续多轮不满后终于发火) +- 明确的愤怒/质问/责备(不是普通追问,是真的生气了) +- 强烈讽刺挖苦(带攻击性的,不是随口吐槽) +- 极度惊喜或感激(远超正常反应,如反复感叹) + +不标记(即使有轻微情绪): +- 普通的不耐烦、催促 +- 对结果不满意但语气平和的反馈 +- 简单的抱怨或吐槽 +- 任何可以理解为"正常沟通中的语气波动"的内容 + +判断技巧:去掉情绪化修饰后信息量是否减少?减少则标记。 + +label只用 NEGATIVE 或 POSITIVE。 + +**活动识别规则:** +- 为每个session提取用户实际在做的事情 +- 标签格式:动词+宾语,4-8字(如"配置远程服务器"、"学习力扣算法") +- 每个session至少1条,通常2-5条 +- 不明确的session标记为 ["不明确"] + +### 步骤C: code_run写入结果 + +分析完成后,用code_run将结果写入: + +```python +import json, os + +results_file = "reflect/analyzers/_scan_results.json" +results = [] +if os.path.exists(results_file): + with open(results_file, 'r', encoding='utf-8') as f: + results = json.load(f) + +# Agent填入本批分析结果 +new_result = { + "batch_idx": len(results), + "emotions": [ + # {"line": 行号, "text": "原文前30字", "label": "NEGATIVE", "reason": "一句话理由"} + ], + "activities": [ + # {"session": "session名", "tasks": ["标签1", "标签2"]} + ] +} + +results.append(new_result) +with open(results_file, 'w', encoding='utf-8') as f: + json.dump(results, f, ensure_ascii=False, indent=2) + +total = len(json.load(open("reflect/analyzers/_batches.json"))) +print(f"OK batch {new_result['batch_idx']} done, progress {len(results)}/{total}") +``` + +### 步骤D: 循环 + +重复步骤A-B-C,直到步骤A输出 `ALL_DONE`。 +每5批可输出一次简短进度,但不可跳过任何批次。 + +--- +## Phase 2: 标签归一化(数据驱动,非凭记忆) + +⛔ **必须执行本阶段,不可跳过。即使你认为标签已经足够清晰,也必须走归一化流程生成 `_normalize_map.json`。Phase 3+4 依赖此文件。** + +### 步骤A: code_run提取所有标签 + +```python +import json + +results_file = "reflect/analyzers/_scan_results.json" +with open(results_file, 'r', encoding='utf-8') as f: + results = json.load(f) + +all_tags = set() +for r in results: + for act in r.get('activities', []): + for t in act.get('tasks', []): + if t != "不明确": + all_tags.add(t) + +tags_sorted = sorted(all_tags) +print(f"共{len(tags_sorted)}个独立标签:") +# 分批输出(每批50个) +for i in range(0, len(tags_sorted), 50): + batch_tags = tags_sorted[i:i+50] + print(f"\n--- 标签批次 {i//50} ({len(batch_tags)}个) ---") + for t in batch_tags: + print(f" {t}") +``` + +### 步骤B: Agent归一化(逐批处理) + +对步骤A输出的每批50个标签,按以下规则归一化: +- 同一件事的不同表述合并(选最清晰简洁的作为归一化名) +- 例如:"编写单元测试"、"补充测试用例"、"写测试" -> "编写测试" +- 例如:"部署服务"、"部署到生产环境"、"上线服务" -> "部署服务" +- **保守合并**:只合并明确同义词,不确定的保持独立 +- "调试bug"和"修复bug"应保持独立(动作不同) +- 独立标签映射为自身 + +输出格式:{"原标签": "归一化名称", ...} + +若标签数>50,必须分批处理(每批50个),每批独立分析后合并。禁止一次性处理100+标签。 + +### 步骤C: code_run写入映射 + +```python +import json + +normalize_map = { + # Agent填入归一化映射(所有批次合并后的完整映射) +} + +normalize_file = "reflect/analyzers/_normalize_map.json" +with open(normalize_file, 'w', encoding='utf-8') as f: + json.dump(normalize_map, f, ensure_ascii=False, indent=2) +print(f"OK normalize_map written, {len(normalize_map)} entries") +``` + +--- +## Phase 3+4: 矩阵构建 + 习惯/消失判定(完整code_run,直接复制执行) + +⛔ **以下代码必须完整复制到code_run中执行,禁止修改、简化或用其他方式替代。输出文件名(scan_report.json, activity_matrix.json, scan_state.json)和JSON格式不可更改,禁止输出markdown报告代替。** + +```python +import json, os +from datetime import datetime, date + +# === 路径 === +base = "reflect/analyzers" +results_file = f"{base}/_scan_results.json" +normalize_file = f"{base}/_normalize_map.json" +report_file = f"{base}/scan_report.json" +matrix_file = f"{base}/activity_matrix.json" +state_file = f"{base}/scan_state.json" +data_file = "memory/L4_raw_sessions/all_user_histories.txt" + +# === 1. 读取数据 === +with open(results_file, 'r', encoding='utf-8') as f: + results = json.load(f) +with open(normalize_file, 'r', encoding='utf-8') as f: + norm_map = json.load(f) + +# === 2. 汇总情绪 === +all_emotions = [] +for r in results: + for e in r.get('emotions', []): + all_emotions.append(e) + +# === 3. 构建活动矩阵 === +today = date.today() +current_week = today.isocalendar() +current_week_str = f"{current_week[0]}-W{current_week[1]:02d}" + +# 从session名提取周次 +def session_to_week(session_name): + """从 MMdd_HHmm 格式提取周次""" + try: + parts = session_name.split('_') + mm = int(parts[0][:2]) + dd = int(parts[0][2:4]) + d = date(2026, mm, dd) + iso = d.isocalendar() + return f"{iso[0]}-W{iso[1]:02d}" + except: + return None + +# 构建 {归一化标签: {week: count}} +matrix = {} +for r in results: + for act in r.get('activities', []): + session = act.get('session', '') + week = session_to_week(session) + if not week: + continue + for task in act.get('tasks', []): + if task == "\u4e0d\u660e\u786e": # "不明确" + continue + # 应用归一化 + normalized = norm_map.get(task, task) + if normalized not in matrix: + matrix[normalized] = {} + matrix[normalized][week] = matrix[normalized].get(week, 0) + 1 + +# === 4. 计算每个标签的统计 === +def week_str_to_date(w): + """将 2026-W11 转为该周的周一日期""" + year, wk = int(w.split('-W')[0]), int(w.split('-W')[1]) + return datetime.strptime(f"{year}-W{wk:02d}-1", "%Y-W%W-%w").date() + +# 计算当前周和上一周的周次字符串 +from datetime import timedelta +last_2_weeks = set() +for i in range(2): + d = today - timedelta(weeks=i) + iso = d.isocalendar() + last_2_weeks.add(f"{iso[0]}-W{iso[1]:02d}") + +task_stats = {} +for task, weeks_data in matrix.items(): + total = sum(weeks_data.values()) + active_weeks = len(weeks_data) + sorted_weeks = sorted(weeks_data.keys()) + last_week = sorted_weeks[-1] if sorted_weeks else "" + is_recent = bool(set(weeks_data.keys()) & last_2_weeks) + + # 计算距今周数 + gap_weeks = 0 + if last_week and not is_recent: + try: + last_d = week_str_to_date(last_week) + gap_weeks = (today - last_d).days // 7 + except: + gap_weeks = 99 + + task_stats[task] = { + "total": total, + "active_weeks": active_weeks, + "last_week": last_week, + "is_recent": is_recent, + "gap_weeks": gap_weeks, + "weeks_detail": weeks_data + } + +# === 5. 判定 habits === +habits = [] +for task, s in task_stats.items(): + if s["active_weeks"] >= 2 and s["is_recent"] and s["total"] >= 3: + habits.append({"task": task, "total": s["total"], "active_weeks": s["active_weeks"], "last_week": s["last_week"]}) +habits.sort(key=lambda x: x["total"], reverse=True) +habits = habits[:15] + +# === 6. 判定 abandoned === +abandoned = [] +for task, s in task_stats.items(): + if s["total"] >= 3 and not s["is_recent"]: + abandoned.append({"task": task, "total": s["total"], "active_weeks": s["active_weeks"], "last_week": s["last_week"], "gap_weeks": s["gap_weeks"]}) +abandoned.sort(key=lambda x: x["total"], reverse=True) +abandoned = abandoned[:30] + +# === 7. 写入 scan_report.json === +report = { + "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "emotions": { + "count": len(all_emotions), + "items": all_emotions + }, + "habits": habits, + "abandoned": abandoned, + "summary": f"本次扫描: {len(all_emotions)}条情绪波动, {len(habits)}项习惯, {len(abandoned)}项消失事项" +} +with open(report_file, 'w', encoding='utf-8') as f: + json.dump(report, f, ensure_ascii=False, indent=2) + +# === 8. 写入 activity_matrix.json === +matrix_output = {} +for task, s in sorted(task_stats.items(), key=lambda x: x[1]["total"], reverse=True): + matrix_output[task] = { + "total": s["total"], + "active_weeks": s["active_weeks"], + "last_week": s["last_week"], + "is_recent": s["is_recent"], + "weeks": s["weeks_detail"] + } +with open(matrix_file, 'w', encoding='utf-8') as f: + json.dump(matrix_output, f, ensure_ascii=False, indent=2) + +# === 9. 更新 scan_state.json === +# 读取数据文件总行数 +with open(data_file, 'r', encoding='utf-8') as f: + total_lines = sum(1 for _ in f) + +old_state = {} +if os.path.exists(state_file): + with open(state_file, 'r', encoding='utf-8') as f: + old_state = json.load(f) + +new_state = { + "emotion_last_line": total_lines, + "scan_count": old_state.get("scan_count", 0) + 1, + "last_scan_date": today.strftime("%Y-%m-%d"), + "status": "completed" +} +with open(state_file, 'w', encoding='utf-8') as f: + json.dump(new_state, f, ensure_ascii=False, indent=2) + +# === 10. 输出摘要 === +print(f"=== Phase 3+4 完成 ===") +print(f"情绪波动: {len(all_emotions)}条") +print(f"习惯事项: {len(habits)}项") +print(f"消失事项: {len(abandoned)}项") +print(f"活动矩阵: {len(matrix_output)}个标签") +print(f"\n已写入:") +print(f" {report_file}") +print(f" {matrix_file}") +print(f" {state_file}") +print(f"\nscan_state: emotion_last_line={total_lines}") +``` + +--- +## Phase 5: 清理 + 验证 + +⛔ **必须执行以下清理代码。最后一行 `print("[TRI_AXIS_SCAN_COMPLETE]")` 是task_monitor判断任务完成的关键标记,不可省略。** + +用code_run执行: + +```python +import json, os + +base = "reflect/analyzers" +batches_file = f"{base}/_batches.json" + +# 1. 删除临时批次文件 +if os.path.exists(batches_file): + os.remove(batches_file) + print(f"已删除: {batches_file}") + +# 2. 验证所有输出文件存在且格式正确 +required = ["scan_report.json", "activity_matrix.json", "scan_state.json"] +for fname in required: + path = f"{base}/{fname}" + if not os.path.exists(path): + print(f"ERROR: 缺失 {fname}") + continue + with open(path, 'r', encoding='utf-8') as f: + data = json.load(f) + print(f"OK: {fname} ({os.path.getsize(path)} bytes)") + +# 3. 输出最终摘要 +with open(f"{base}/scan_report.json", 'r', encoding='utf-8') as f: + report = json.load(f) +print(f"\n=== 最终报告摘要 ===") +print(report["summary"]) +print(f"\n保留中间产物供下次增量: _scan_results.json, _normalize_map.json") +print("\n[TRI_AXIS_SCAN_COMPLETE]") +``` + +--- + +## 周次推算规则 + +session名格式为 `MMdd_HHmm-MMdd_HHmm`,需要结合当前年份推算: +- 提取第一个MMdd +- 用 `datetime(2026, MM, DD).isocalendar()` 获取周次 +- 输出格式: `2026-Wxx` + +--- + +## 断点恢复 + +若执行中断: +1. 检查 `_batches.json` 是否存在 -> 存在则跳过Phase0 +2. 检查 `_scan_results.json` 已处理的batch_idx -> 从下一个继续Phase1 +3. 检查 `_normalize_map.json` 是否存在 -> 存在则跳过Phase2 + +--- + +## 注意事项 +- Agent自身就是LLM,直接阅读文本分析即可,不需要调用任何外部LLM API +- 每个Phase都是独立的code_run或分析步骤,单步不会超时 +- Phase1是严格循环:每批2次code_run(读+写)+ 1次分析,约25批需50轮工具调用 +- 若挂载到task模式运行,配合 `reflect/task_monitor.py` 自动续发(防止agent自停) +- 情绪检测高阈值:宁可漏检不可误检,只标记真正的情绪爆发 +- 活动标签要具体:避免过于笼统(如"编程"),应该是"调试API接口"这样的粒度 +- Phase2归一化保守合并:只合并明确同义词,不确定的保持独立 From 6f7f4ddaa9520d9b727a34c188e81ad84a0389b4 Mon Sep 17 00:00:00 2001 From: Agent Date: Tue, 19 May 2026 10:56:44 +0800 Subject: [PATCH 2/3] save: history_insight_sop + build_report (aligned to tri_axis_scanner output format) --- memory/L4_raw_sessions/session_traceback.py | 282 ++++++++++ memory/build_report.py | 207 ++++++++ memory/history_insight_sop.md | 82 +++ reflect/analyzers/tri_axis_scanner.py | 553 ++++++++++++++++++++ 4 files changed, 1124 insertions(+) create mode 100644 memory/L4_raw_sessions/session_traceback.py create mode 100644 memory/build_report.py create mode 100644 memory/history_insight_sop.md create mode 100644 reflect/analyzers/tri_axis_scanner.py diff --git a/memory/L4_raw_sessions/session_traceback.py b/memory/L4_raw_sessions/session_traceback.py new file mode 100644 index 000000000..8ab2f83f5 --- /dev/null +++ b/memory/L4_raw_sessions/session_traceback.py @@ -0,0 +1,282 @@ +"""L4 Session Traceback Tool — 从 all_histories.txt 摘要溯源到完整上下文 +用法: + from session_traceback import traceback + result = traceback("你要不断学习不断迭代直到我主动干预喊停") + print(result['before']) # 前文 + print(result['match']) # 匹配的完整turn + print(result['after']) # 后文 +""" +import zipfile, os, re +from typing import Optional + +L4_DIR = os.path.dirname(os.path.abspath(__file__)) +HIST_PATH = os.path.join(L4_DIR, "all_histories.txt") + +# ─── 内部工具 ─── + +def _load_history_lines(): + with open(HIST_PATH, 'r', encoding='utf-8') as f: + return f.readlines() + +def _find_session(lines: list, target_idx: int) -> Optional[str]: + """从目标行往上找最近的SESSION标记""" + for i in range(target_idx, -1, -1): + if lines[i].startswith("SESSION: "): + return lines[i].strip().replace("SESSION: ", "") + return None + +def _session_to_zip(session: str) -> str: + """从session名推断zip路径 (MMDD_HHMM-MMDD_HHMM → 2026-MM.zip)""" + month = session[:2] + return os.path.join(L4_DIR, f"2026-{month}.zip") + +def _count_occurrence_in_history(lines: list, target_idx: int, session_start_idx: int) -> int: + """计算目标行在同session中是第几次出现(用于消歧短文本)""" + target_text = lines[target_idx].strip() + count = 0 + for i in range(session_start_idx, target_idx + 1): + if lines[i].strip() == target_text: + count += 1 + return count + +def _find_session_start(lines: list, target_idx: int) -> int: + """找到当前session的起始行""" + for i in range(target_idx, -1, -1): + if lines[i].startswith("SESSION: "): + return i + return 0 + +def _extract_turn_boundaries(content: str): + """解析session文件,返回每个turn的(start, user_start, response_start, end)""" + prompt_pattern = re.compile(r'^=== Prompt === .+$', re.MULTILINE) + user_pattern = re.compile(r'^=== USER ===$', re.MULTILINE) + response_pattern = re.compile(r'^=== Response === .+$', re.MULTILINE) + + prompts = [m.start() for m in prompt_pattern.finditer(content)] + turns = [] + for i, p_start in enumerate(prompts): + turn_end = prompts[i+1] if i+1 < len(prompts) else len(content) + # 找这个turn内的USER和Response标记 + segment = content[p_start:turn_end] + u_match = user_pattern.search(segment) + r_match = response_pattern.search(segment) + user_pos = p_start + u_match.start() if u_match else None + resp_pos = p_start + r_match.start() if r_match else None + turns.append({ + 'start': p_start, + 'end': turn_end, + 'user_pos': user_pos, + 'resp_pos': resp_pos, + }) + return turns + +def _get_user_text(content: str, turn: dict) -> str: + """提取一个turn中的用户文本""" + if turn['user_pos'] is None: + return "" + # USER文本在 "=== USER ===\n" 之后,到 "=== Response ===" 之前 + start = content.index('\n', turn['user_pos']) + 1 + end = turn['resp_pos'] if turn['resp_pos'] else turn['end'] + return content[start:end].strip() + +def _get_response_text(content: str, turn: dict) -> str: + """提取一个turn中的response文本""" + if turn['resp_pos'] is None: + return "" + start = content.index('\n', turn['resp_pos']) + 1 + return content[start:turn['end']].strip() + + +# ─── 主函数 ─── + +def traceback(query: str, context_chars: int = 1500, nth: int = 0) -> dict: + """从 all_histories.txt 中的文本溯源到完整上下文 + + Args: + query: 要搜索的文本(可以是 [USER]: xxx 或 [Agent] xxx 格式,也可以是纯文本) + context_chars: 前后文各取多少字符(默认1500) + nth: 如果有多个匹配,取第几个(0-based,默认第一个) + + Returns: + dict with keys: + - session: session名 + - zip_file: zip文件名 + - before: 前文(前一个turn的response尾部 + 当前turn的prompt头) + - match: 匹配到的完整内容(用户消息或agent回复片段) + - after: 后文(当前turn的response或下一个turn的开头) + - turn_index: 在session中第几个turn + - total_turns: session总turn数 + - history_context: all_histories.txt中的上下文行 + """ + lines = _load_history_lines() + + # 标准化查询:去掉前缀 + search_text = query.strip() + if search_text.startswith("[USER]: "): + search_text = search_text[8:] + search_type = "USER" + elif search_text.startswith("[Agent] "): + search_text = search_text[8:] + search_type = "AGENT" + else: + search_type = "AUTO" + + # Step 1: 在 all_histories.txt 中定位 + matches = [] + for i, line in enumerate(lines): + stripped = line.strip() + if search_text in stripped: + session = _find_session(lines, i) + if session: + matches.append((i, session, stripped)) + + if not matches: + return {"error": f"未在 all_histories.txt 中找到: '{search_text[:50]}...'"} + + if nth >= len(matches): + return {"error": f"只找到 {len(matches)} 个匹配,但请求第 {nth+1} 个"} + + target_idx, session, hist_line = matches[nth] + + # 获取 history 上下文(前后各3行) + hist_ctx_start = max(0, target_idx - 3) + hist_ctx_end = min(len(lines), target_idx + 4) + history_context = [] + for j in range(hist_ctx_start, hist_ctx_end): + marker = ">>>" if j == target_idx else " " + history_context.append(f"{marker} {lines[j].rstrip()}") + + # Step 2: 定位zip和文件 + zip_path = _session_to_zip(session) + target_file = f"{session}.txt" + + if not os.path.exists(zip_path): + return {"error": f"ZIP文件不存在: {zip_path}"} + + with zipfile.ZipFile(zip_path, 'r') as zf: + if target_file not in zf.namelist(): + return {"error": f"Session文件 {target_file} 不在 {os.path.basename(zip_path)} 中"} + with zf.open(target_file) as f: + content = f.read().decode('utf-8', errors='replace') + + # Step 3: 在session文件中定位 + # 计算这是session内第几次出现(消歧短文本) + session_start = _find_session_start(lines, target_idx) + occurrence = _count_occurrence_in_history(lines, target_idx, session_start) + + # 搜索策略:先精确搜索,再降级 + if search_type == "USER": + # 用户原话精确存在于 "=== USER ===" 之后 + search_key = search_text[:80] if len(search_text) > 80 else search_text + elif search_type == "AGENT": + # Agent摘要在 标签内 + search_key = search_text[:60] if len(search_text) > 60 else search_text + else: + search_key = search_text[:60] if len(search_text) > 60 else search_text + + # 找到第 occurrence 次出现 + pos = -1 + start_search = 0 + for _ in range(occurrence): + pos = content.find(search_key, start_search) + if pos == -1: + break + start_search = pos + 1 + + if pos == -1: + # 降级:用更短的关键词 + for length in [40, 20, 10]: + short_key = search_text[:length] + pos = content.find(short_key) + if pos >= 0: + break + + if pos == -1: + return { + "error": f"在session文件中未找到匹配文本", + "session": session, + "zip_file": os.path.basename(zip_path), + "history_context": "\n".join(history_context), + "total_matches_in_history": len(matches), + } + + # Step 4: 提取上下文 + before_start = max(0, pos - context_chars) + after_end = min(len(content), pos + len(search_key) + context_chars) + + before_text = content[before_start:pos] + match_text = content[pos:pos + len(search_key)] + after_text = content[pos + len(search_key):after_end] + + # 边界处理:判断前文/后文是否有实质内容 + # 前文:如果只剩 "=== Prompt === ...\n=== USER ===\n" 这类header,视为无效 + before_stripped = re.sub(r'=== (Prompt|USER|Response) ===[^\n]*\n?', '', before_text).strip() + if len(before_stripped) < 20: + before_text = None + + # 后文:如果剩余内容不足20字符有效文本,视为无效 + after_stripped = re.sub(r'=== (Prompt|USER|Response) ===[^\n]*\n?', '', after_text).strip() + if len(after_stripped) < 20: + after_text = None + + # 解析turn结构获取额外信息 + turns = _extract_turn_boundaries(content) + turn_index = -1 + for ti, turn in enumerate(turns): + if turn['start'] <= pos < turn['end']: + turn_index = ti + break + + return { + "session": session, + "zip_file": os.path.basename(zip_path), + "turn_index": turn_index, + "total_turns": len(turns), + "position": pos, + "file_size": len(content), + "before": before_text, + "match": match_text, + "after": after_text, + "history_context": "\n".join(history_context), + "total_matches_in_history": len(matches), + "selected_match": nth, + } + + +def traceback_pretty(query: str, context_chars: int = 1500, nth: int = 0) -> str: + """格式化输出的溯源结果""" + r = traceback(query, context_chars, nth) + if "error" in r: + return f"❌ {r['error']}\n" + r.get('history_context', '') + + output = [] + output.append(f"{'='*60}") + output.append(f"📍 Session: {r['session']} (Turn {r['turn_index']+1}/{r['total_turns']})") + output.append(f"📦 ZIP: {r['zip_file']} | 位置: {r['position']}/{r['file_size']}") + output.append(f"🔍 History匹配: 第{r['selected_match']+1}/{r['total_matches_in_history']}个") + output.append(f"{'='*60}") + output.append(f"\n--- all_histories.txt 上下文 ---") + output.append(r['history_context']) + + if r['before'] is not None: + output.append(f"\n--- 前文 (最后{len(r['before'])}字符) ---") + output.append(r['before'][-800:]) + else: + output.append(f"\n--- 前文: 无(这是session的开头) ---") + + output.append(f"\n{'>'*20} 匹配内容 {'<'*20}") + output.append(r['match']) + output.append(f"{'>'*20} 匹配结束 {'<'*20}") + + if r['after'] is not None: + output.append(f"\n--- 后文 (前{len(r['after'])}字符) ---") + output.append(r['after'][:800]) + else: + output.append(f"\n--- 后文: 无(这是session的结尾) ---") + + return "\n".join(output) + + +if __name__ == "__main__": + # 测试 + print(traceback_pretty("你要不断学习不断迭代直到我主动干预喊停")) diff --git a/memory/build_report.py b/memory/build_report.py new file mode 100644 index 000000000..49b27fc1c --- /dev/null +++ b/memory/build_report.py @@ -0,0 +1,207 @@ +# history_insight P3+P4: 矩阵构建+判定+写入产物 +# 输出格式对齐 reflect/analyzers/tri_axis_scanner.py +# 用法: python build_report.py [工作目录] +import json, os, sys, time +from datetime import datetime, date +from collections import defaultdict + +t0 = time.time() +os.chdir(sys.argv[1] if len(sys.argv) > 1 else ".") +scan_results = json.load(open("scan_results.json", encoding="utf-8")) +normalize_map = json.load(open("normalize_map.json", encoding="utf-8")) + +# 读取上次状态 +prev_state = {} +if os.path.exists("scan_state.json"): + prev_state = json.load(open("scan_state.json", encoding="utf-8")) +start_line = prev_state.get("emotion_last_line", 0) + +# ============================================================ +# Phase 3a: 提取情绪事件 (对齐 emotion.detections 格式) +# ============================================================ +all_emotions = [] +for batch in scan_results: + for e in batch.get("emotions", []): + # 对齐字段名: line→line_no, 补 traceback_query/occurrence_nth + raw_text = e.get("text", "") + # text 格式: "[USER]: 原文" + text_with_prefix = raw_text if raw_text.startswith("[USER]:") else f"[USER]: {raw_text}" + # traceback_query: 不带前缀的纯文本 + traceback_query = raw_text.lstrip("[USER]: ").strip() if raw_text.startswith("[USER]:") else raw_text.strip() + + all_emotions.append({ + "line_no": e.get("line_no", e.get("line", 0)), + "label": e.get("label", "NEGATIVE"), + "reason": e.get("reason", ""), + "text": text_with_prefix, + "traceback_query": traceback_query, + "occurrence_nth": e.get("occurrence_nth", 0), + }) + +# 按行号排序 +all_emotions.sort(key=lambda x: x["line_no"]) +max_line = max((e["line_no"] for e in all_emotions), default=start_line) + +# 统计 +total_user_lines = max_line # 近似: 最大行号≈总用户行数 +total_negative = sum(1 for e in all_emotions if e["label"] == "NEGATIVE") +total_positive = sum(1 for e in all_emotions if e["label"] == "POSITIVE") +scan_range = [start_line + 1, max_line] if start_line > 0 else [1, max_line] +new_lines_scanned = scan_range[1] - scan_range[0] + 1 if scan_range[1] >= scan_range[0] else 0 + +emotion_stats = { + "total_user_lines": total_user_lines, + "total_negative": total_negative, + "total_positive": total_positive, + "detection_rate": round(len(all_emotions) / max(total_user_lines, 1) * 100, 1), +} + +# ============================================================ +# Phase 3b: 构建活动矩阵 + task_sessions (用于 source_lines) +# ============================================================ +matrix = defaultdict(lambda: defaultdict(int)) +task_sessions = defaultdict(list) # {task: [{text, session}]} + +for batch in scan_results: + for act in batch.get("activities", []): + sess = act.get("session", "") + try: + mm, dd = int(sess[:2]), int(sess[2:4]) + week_str = f"2026-W{date(2026, mm, dd).isocalendar()[1]:02d}" + except (ValueError, IndexError): + continue + for raw_tag in act.get("tasks", []): + tag = normalize_map.get(raw_tag, raw_tag) + matrix[tag][week_str] += 1 + # 收集 source_lines (每个task最多保留15条) + if len(task_sessions[tag]) < 15: + # 取该session中的一条代表性文本 + text_sample = act.get("text", raw_tag) + task_sessions[tag].append({"text": text_sample, "session": sess}) + +# 合并已有矩阵(增量) +if os.path.exists("activity_matrix.json"): + old_matrix = json.load(open("activity_matrix.json", encoding="utf-8")) + for tag, weeks in old_matrix.items(): + for w, c in weeks.items(): + if w not in matrix[tag] or matrix[tag][w] == 0: + matrix[tag][w] = c + +# ============================================================ +# Phase 3c: 习惯/消失判定 (对齐 tri_axis_scanner.py 格式) +# ============================================================ +iso_now = date.today().isocalendar() +current_week = f"{iso_now[0]}-W{iso_now[1]:02d}" +recent_weeks = {current_week, f"{iso_now[0]}-W{max(iso_now[1]-1, 1):02d}"} + +habits = [] +abandoned = [] + +for task, week_counts in matrix.items(): + total_count = sum(week_counts.values()) + active_weeks = sorted(week_counts.keys()) + span = len(active_weeks) + last_week = active_weeks[-1] if active_weeks else "" + + # 计算 gap(当前周 - 最后活跃周) + try: + cur_y, cur_w = current_week.split("-W") + last_y, last_w = last_week.split("-W") + gap = (int(cur_y) - int(last_y)) * 52 + (int(cur_w) - int(last_w)) + except: + gap = 0 + + is_recent = any(w in recent_weeks for w in active_weeks) + + # source_lines: 最多15条 (仅habits需要) + sources = task_sessions.get(task, [])[:15] + + if span >= 2 and is_recent and total_count >= 3: + habits.append({ + "task": task, + "weeks_active": active_weeks, + "total_count": total_count, + "span": span, + "source_lines": sources, + }) + elif total_count >= 3 and not is_recent: + abandoned.append({ + "task": task, + "weeks_active": active_weeks, + "total_count": total_count, + "last_week": last_week, + "gap": gap, + }) + +# 排序+限制数量 +habits.sort(key=lambda x: x["total_count"], reverse=True) +abandoned.sort(key=lambda x: x["total_count"], reverse=True) +habits = habits[:15] +abandoned = abandoned[:30] + +elapsed = round(time.time() - t0, 1) + +# ============================================================ +# Phase 4: 输出报告 (对齐 tri_axis_scanner.py report 结构) +# ============================================================ +summary = ( + f"情绪: {len(all_emotions)}条检出 | " + f"习惯: {len(habits)}项 [{', '.join(h['task'] for h in habits[:3])}] | " + f"消失: {len(abandoned)}项 [{', '.join(a['task'] for a in abandoned[:3])}]" +) + +report = { + "scan_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "elapsed_seconds": elapsed, + "emotion": { + "count": len(all_emotions), + "scan_range": scan_range, + "new_lines_scanned": new_lines_scanned, + "detections": all_emotions, + "stats": emotion_stats, + }, + "habits": { + "count": len(habits), + "items": habits, + }, + "abandoned": { + "count": len(abandoned), + "items": abandoned, + }, + "errors": [], + "summary": summary, +} + +# 写入产物 +with open("scan_report.json", "w", encoding="utf-8") as f: + json.dump(report, f, ensure_ascii=False, indent=2) + +# activity_matrix: 转为普通dict写入 +matrix_output = {k: dict(v) for k, v in matrix.items()} +with open("activity_matrix.json", "w", encoding="utf-8") as f: + json.dump(matrix_output, f, ensure_ascii=False, indent=2) + +# scan_state: 对齐字段 +N = len(scan_results) +state = { + "phase": "P4_COMPLETE", + "batches_total": N, + "batches_done": N, + "p2_done": True, + "p3_done": True, + "last_scan_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "last_updated": date.today().isoformat(), + "emotion_last_line": max_line, + "scan_count": prev_state.get("scan_count", 0) + 1, +} +with open("scan_state.json", "w", encoding="utf-8") as f: + json.dump(state, f, ensure_ascii=False, indent=2) + +# P4清理+验证 +if os.path.exists("batches.json"): + os.remove("batches.json") +for f_name in ["scan_report.json", "activity_matrix.json", "scan_state.json"]: + json.load(open(f_name, encoding="utf-8")) + +print(f"[BUILD_REPORT_DONE] emotions={len(all_emotions)}, habits={len(habits)}, abandoned={len(abandoned)}, matrix_tags={len(matrix)}") +print(f" elapsed: {elapsed}s | summary: {summary}") diff --git a/memory/history_insight_sop.md b/memory/history_insight_sop.md new file mode 100644 index 000000000..58436d247 --- /dev/null +++ b/memory/history_insight_sop.md @@ -0,0 +1,82 @@ +# 历史洞察扫描 SOP (history_insight_sop) + +从 L4 历史对话中提取三类有价值信息:情绪爆发、持续习惯、消失事项。 + +## 路径 +- 数据源: `../memory/L4_raw_sessions/all_user_histories.txt` +- 产物全部在 `./`(temp目录): `batches.json`, `scan_results.json`, `normalize_map.json`, `scan_report.json`, `activity_matrix.json`, `scan_state.json` + +## 流程概览 +P0数据准备 → P1逐批提取(循环) → P2标签归一化+执行脚本 + +--- + +## P0: 数据准备 +前置依赖(若数据源不存在则按顺序生成): +1. 运行 `compress_session.py` → 生成 `all_histories.txt` +2. 从 `all_histories.txt` 过滤掉 `[Agent]:` 行 → 生成 `all_user_histories.txt` + +读 `scan_state.json` 获取 `emotion_last_line`(增量起点,首次=0)。读数据源按 `SESSION:` 分割,提取 `[USER]:` 行(忽略≤5字的),只保留行号>起点的新增行。按session为单位装入批次(每批≤120行),写 `batches.json`。格式:`[{session: "名", lines: [[全局行号, "文本"], ...]}, ...]`。 + +## P1: 逐批提取(核心循环) + +⛔ **严格单批处理。已验证:合并多批→后半段数据被编造。** + +每轮固定3步,不可变形: +1. code_run:从`batches.json`读第N批(N=已有结果数),打印格式 `[行号] 文本` +2. 分析当前输出文本(禁止凭记忆补充、禁止预读下一批) +3. code_run:将本批结果追加到`scan_results.json`,打印进度`done X/total` + +读取脚本自动计算进度:`done = len(已有结果)`,只输出`batches[done]`。N批就是N轮循环,不可压缩。 + +⛔ **最后一批写入后必须打印提醒:** `print("P1完成!下一步:P2写normalize_map.json → P3执行 python ../memory/build_report.py .")` + +### 情绪检测(高阈值,宁漏不误) + +仅标记: +- 累积不满后的爆发(连续多轮后终于发火) +- 明确愤怒/质问/责备(真的生气,不是追问) +- 强烈讽刺挖苦(带攻击性) +- 极度惊喜感激(远超正常反应) + +**不标记**:普通不耐烦、催促、语气平和的不满、日常吐槽。判断技巧:去掉情绪修饰后信息量是否减少?减少才标记。 + +label: NEGATIVE / POSITIVE。输出: `{"line_no": 行号, "text": "[USER]: 前30字", "label": "X", "reason": "一句话", "traceback_query": "前30字(无[USER]:前缀)", "occurrence_nth": 0}` + +`occurrence_nth`: 该文本在全文中第几次出现(从0开始),供 `session_traceback.py` 精确溯源。大多数情况为0。 + +### 活动识别 +每session提取用户在做什么。标签格式:动词+宾语 4-8字(如"配置远程服务器")。每session 1-5条。不明确标 `["不明确"]`。 + +输出: `{"session": "名", "tasks": ["标签1", ...], "text": "该session中最能代表活动的一句用户原文(前30字)"}` + +`text` 字段供 `build_report.py` 生成 `source_lines`,用于后续 `session_traceback.py` 溯源。 + +## P2: 标签归一化 + +⛔ **不可跳过,P3脚本读 `normalize_map.json`,不存在则报错。** + +⛔ **归一化前必须 code_run 读取已有 `activity_matrix.json` 的标签列表(若存在)。新标签优先映射到已有标签(语义一致时),保证跨次运行标签一致性。** + +提取所有标签去重排序,每批50个进行同义合并。规则: +- 新标签与已有matrix标签语义一致时,映射到已有标签名(优先级最高) +- 保守合并:只合并明确同义词,不确定保持独立 +- 同一功能的子步骤合并为功能级(如"X功能重构"+"X功能测试"+"X功能PR"→"X功能开发"),跨性质保持独立(开发≠文档≠部署) +- "调试bug"≠"修复bug"(动作不同) +- 输出完整映射 `{"原标签": "归一化名", ...}`,独立标签映射为自身 + +⛔ **归一化完成后必须 code_run 写入 `normalize_map.json`,同一个 code_run 内紧接着执行:** +```python +import subprocess, sys +r = subprocess.run([sys.executable.replace("pythonw","python"), "../memory/build_report.py", "."], capture_output=True, text=True) +print(r.stdout); print(r.stderr) +``` +看到 `[BUILD_REPORT_DONE]` 即全部完成。未看到则读stderr排错。 + +--- + +## 坑点 +- session名格式 `MMdd_HHmm-MMdd_HHmm`,取第一个MMdd推算周次 +- `week_str_to_date` 用 `"%Y-W%W-%w"` 格式解析 +- 归一化时标签数可能300+,必须分批处理避免质量退化 +- activity_matrix结构是 `{归一化标签: {week: count}}`(按标签分组),不是 `{week: {标签: count}}`(按周分组) diff --git a/reflect/analyzers/tri_axis_scanner.py b/reflect/analyzers/tri_axis_scanner.py new file mode 100644 index 000000000..70aede432 --- /dev/null +++ b/reflect/analyzers/tri_axis_scanner.py @@ -0,0 +1,553 @@ +"""tri_axis_scanner.py - 统一扫描器 +一次扫描同时输出:情绪波动点 + 活动标签,再通过归一化+矩阵判定习惯/消失。 +支持增量扫描,输出 scan_report.json / scan_state.json / activity_matrix.json。 +""" +import sys, os, json, time, re +from collections import defaultdict +from datetime import datetime, date + +PROJECT_ROOT = os.path.normpath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..')) +sys.path.insert(0, PROJECT_ROOT) + +from llmcore import fast_ask + +CFG = os.environ.get("SCANNER_LLM_CFG", "claude_config") +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +L4_DIR = os.path.join(PROJECT_ROOT, 'memory', 'L4_raw_sessions') +DATA_PATH = os.path.join(L4_DIR, 'all_user_histories.txt') +REPORT_FILE = os.path.join(BASE_DIR, "scan_report.json") +STATE_FILE = os.path.join(BASE_DIR, "scan_state.json") +MATRIX_FILE = os.path.join(BASE_DIR, "activity_matrix.json") + + +def prepare_data(): + """调用compress_session生成all_histories.txt,再过滤出仅含USER行的all_user_histories.txt""" + import importlib.util + compress_script = os.path.join(L4_DIR, 'compress_session.py') + if not os.path.exists(compress_script): + return + # 动态加载compress_session模块 + spec = importlib.util.spec_from_file_location("compress_session", compress_script) + cs = importlib.util.module_from_spec(spec) + spec.loader.exec_module(cs) + # 执行batch_process更新all_histories.txt + raw_dir = os.path.join(PROJECT_ROOT, 'temp', 'model_responses') + if os.path.isdir(raw_dir): + cs.batch_process(raw_dir, l4_dir=L4_DIR, dry_run=False) + # 从all_histories.txt过滤出仅USER行 → all_user_histories.txt + all_hist = os.path.join(L4_DIR, 'all_histories.txt') + if not os.path.exists(all_hist): + return + with open(all_hist, 'r', encoding='utf-8', errors='replace') as f: + lines = f.readlines() + with open(DATA_PATH, 'w', encoding='utf-8') as f: + for line in lines: + stripped = line.strip() + if stripped.startswith('SESSION:') or stripped.startswith('=' * 10) or stripped.startswith('[USER]:'): + f.write(line) + + +def p(msg): + print(msg, flush=True) + + +# ============================================================ +# PROMPTS +# ============================================================ +UNIFIED_PROMPT = """你是一个精确的对话分析器,同时执行两个任务: + +## 任务1: 情绪波动检测 +只找出用户情绪强烈爆发的瞬间。 + +仅标记以下情况: +- 累积不满后的爆发(连续多轮不满后终于发火) +- 明确的愤怒/质问/责备(不是普通追问,是真的生气了) +- 强烈讽刺挖苦(带攻击性的,不是随口吐槽) +- 极度惊喜或感激(远超正常反应,如反复感叹) + +不标记(即使有轻微情绪): +- 普通的不耐烦、催促 +- 对结果不满意但语气平和的反馈 +- 简单的抱怨或吐槽 +- 任何可以理解为"正常沟通中的语气波动"的内容 + +判断技巧:去掉情绪化修饰后信息量是否减少?减少则标记。 + +## 任务2: 活动标注 +为每个session标注1-3个活动标签,描述用户主动发起的目标或项目(动词+宾语)。 +- 只标注用户真正想做的事,忽略AI的中间执行步骤 +- 如果session内容不明确或太短,标注为"不明确" + +## 输入格式 +多个session,每个session有编号和多条用户发言(带行号)。 + +## 输出格式 +严格JSON,包含两个数组: +{ + "emotions": [{"line": 行号, "label": "NEGATIVE"|"POSITIVE", "reason": "一句话理由"}], + "activities": [{"session": session编号, "tasks": ["标签1", "标签2"]}] +} + +- emotions: 只输出强烈情绪爆发的行,没有则为空数组。 +- activities: 每个session都要有一条 + +只输出JSON,不要其他内容。""" + +NORMALIZE_PROMPT = """你是一个标签归一化器。给定一组活动标签,将含义相同或高度相关的标签合并为统一名称。 + +规则: +- 同一件事的不同表述合并为一个(选最清晰简洁的) +- 例如:"编写单元测试"、"补充测试用例"、"写测试" -> "编写测试" +- 例如:"部署服务"、"部署到生产环境"、"上线服务" -> "部署服务" +- 独立标签保持原样,不强行合并 + +输出JSON对象: {"原标签": "归一化名称", ...} +只输出JSON。""" + + +# ============================================================ +# 工具函数 +# ============================================================ +def robust_json_parse(text): + text = text.strip() + if text.startswith('```'): + text = re.sub(r'^```\w*\n?', '', text) + text = re.sub(r'\n?```$', '', text) + text = text.strip() + try: + return json.loads(text) + except: + m = re.search(r'\{[\s\S]*\}', text) + if m: + try: + return json.loads(m.group()) + except: + pass + return None + + +def load_state(): + if os.path.exists(STATE_FILE): + with open(STATE_FILE, 'r', encoding='utf-8') as f: + return json.load(f) + return {"last_scan_time": None, "emotion_last_line": 0, "habits_last_scan": None, "scan_count": 0} + + +def save_state(state): + with open(STATE_FILE, 'w', encoding='utf-8') as f: + json.dump(state, f, ensure_ascii=False, indent=2) + + +def get_week(session_name): + """从session名提取周标识""" + m = re.match(r'(\d{4})(\d{2})(\d{2})', session_name) + if m: + try: + d = date(int(m.group(1)), int(m.group(2)), int(m.group(3))) + return f"{d.year}-W{d.isocalendar()[1]:02d}" + except: + pass + m = re.match(r'(\d{2})(\d{2})_', session_name) + if m: + try: + d = date(date.today().year, int(m.group(1)), int(m.group(2))) + return f"{d.year}-W{d.isocalendar()[1]:02d}" + except: + pass + return "unknown" + + +def get_current_week(): + today = date.today() + return f"{today.year}-W{today.isocalendar()[1]:02d}" + + +# ============================================================ +# 数据加载 +# ============================================================ +def load_sessions(start_line=0): + """加载数据,返回 [(session_name, [(global_line_no, text), ...])] + 如果 start_line > 0,只加载该行之后的内容(增量模式)。 + 同时返回文件总行数。 + """ + with open(DATA_PATH, 'r', encoding='utf-8') as f: + lines = f.readlines() + + total_lines = len(lines) + sessions = [] + current_session = None + current_lines = [] + + for i, line in enumerate(lines, 1): + stripped = line.strip() + if not stripped: + continue + if stripped.startswith('SESSION:'): + if current_session and current_lines: + # 只保留有新行的session + new_lines = [(ln, t) for ln, t in current_lines if ln > start_line] + if new_lines: + sessions.append((current_session, current_lines)) # 保留完整session用于上下文 + current_session = stripped[8:].strip() + current_lines = [] + elif stripped.startswith('=' * 10): + continue + elif stripped.startswith('[USER]:'): + text = stripped[7:].strip() + if text and len(text) > 5: + current_lines.append((i, text)) + + if current_session and current_lines: + new_lines = [(ln, t) for ln, t in current_lines if ln > start_line] + if new_lines: + sessions.append((current_session, current_lines)) + + return sessions, total_lines + + +# ============================================================ +# 分批 +# ============================================================ +def build_batches(sessions, max_lines_per_batch=60): + batches = [] + current_batch = [] + current_lines_count = 0 + + for session_name, session_lines in sessions: + if len(session_lines) > max_lines_per_batch: + if current_batch: + batches.append(current_batch) + current_batch = [] + current_lines_count = 0 + batches.append([(session_name, session_lines)]) + continue + + if current_lines_count + len(session_lines) > max_lines_per_batch: + batches.append(current_batch) + current_batch = [] + current_lines_count = 0 + + current_batch.append((session_name, session_lines)) + current_lines_count += len(session_lines) + + if current_batch: + batches.append(current_batch) + + return batches + + +def format_batch(batch): + parts = [] + for idx, (session_name, session_lines) in enumerate(batch, 1): + parts.append(f"--- Session {idx}: {session_name} ---") + for line_no, text in session_lines: + parts.append(f" [{line_no}] {text}") + parts.append("") + return '\n'.join(parts) + + +# ============================================================ +# 主流程 +# ============================================================ +def main(): + t0 = time.time() + p(f"[TriAxisScanner] 启动, cfg={CFG}") + + # Phase 0: 数据准备 - 压缩原始日志并提取USER行 + prepare_data() + + # 加载状态(增量扫描) + state = load_state() + start_line = state.get('emotion_last_line', 0) + is_incremental = start_line > 0 + p(f"[模式] {'增量' if is_incremental else '全量'} (从第{start_line}行开始)") + + # Phase 0: 加载数据 + sessions, total_file_lines = load_sessions(start_line) + total_user_lines = sum(len(lines) for _, lines in sessions) + new_lines_count = sum(1 for _, lines in sessions for ln, _ in lines if ln > start_line) + p(f"[数据] {len(sessions)} sessions, {total_user_lines} USER行 (新增{new_lines_count}行)") + + if not sessions: + p("[完成] 无新数据需要扫描") + return + + # 分批 + batches = build_batches(sessions, max_lines_per_batch=60) + p(f"[分批] {len(batches)} 批") + + # 加载已有报告(增量模式下合并) + existing_emotions = [] + if is_incremental and os.path.exists(REPORT_FILE): + with open(REPORT_FILE, 'r', encoding='utf-8') as f: + old_report = json.load(f) + existing_emotions = old_report.get('emotion', {}).get('detections', []) + p(f"[增量] 已有{len(existing_emotions)}条情绪记录") + + # Phase 1: 统一扫描 + p("\n[Phase1] 统一扫描...") + all_emotions = list(existing_emotions) # 保留旧数据 + all_activities = [] # [(session_name, [tasks], [(line_no, text)])] + emotion_counter = defaultdict(int) # 用于occurrence_nth + + for batch_idx, batch in enumerate(batches): + user_content = format_batch(batch) + prompt = UNIFIED_PROMPT + "\n\n## 待分析内容\n" + user_content + + try: + result = fast_ask(prompt, CFG) + parsed = robust_json_parse(result) + + if not parsed: + p(f" Batch {batch_idx+1}/{len(batches)}: PARSE FAILED") + for session_name, session_lines in batch: + all_activities.append((session_name, ["不明确"], session_lines)) + continue + + # 提取情绪 + emotions = parsed.get('emotions', []) + for emo in emotions: + try: + line_no = int(emo.get('line', 0)) + except (ValueError, TypeError): + continue + # 找到对应的原文 + text = "" + for _, session_lines in batch: + for ln, t in session_lines: + if ln == line_no: + text = t + break + if text: + break + + label = emo.get('label', 'NEGATIVE') + emotion_counter[label] += 1 + + all_emotions.append({ + 'line_no': line_no, + 'label': label, + 'reason': emo.get('reason', ''), + 'text': f"[USER]: {text}", + 'traceback_query': text, + 'occurrence_nth': emotion_counter[label] - 1, + }) + + # 提取活动 + activities = parsed.get('activities', []) + for act in activities: + try: + sess_idx = int(act.get('session', 1)) - 1 + except (ValueError, TypeError): + sess_idx = 0 + tasks = act.get('tasks', ['不明确']) + if 0 <= sess_idx < len(batch): + session_name, session_lines = batch[sess_idx] + all_activities.append((session_name, tasks, session_lines)) + + # 补充没被标注的session + tagged_indices = set() + for act in activities: + try: + tagged_indices.add(int(act.get('session', 0)) - 1) + except (ValueError, TypeError): + pass + for i, (session_name, session_lines) in enumerate(batch): + if i not in tagged_indices: + all_activities.append((session_name, ["不明确"], session_lines)) + + p(f" Batch {batch_idx+1}/{len(batches)}: OK (emo={len(emotions)}, sessions={len(batch)})") + + except Exception as e: + p(f" Batch {batch_idx+1}/{len(batches)}: ERROR {e}") + for session_name, session_lines in batch: + all_activities.append((session_name, ["不明确"], session_lines)) + + p(f"\n[Phase1完成] 情绪={len(all_emotions)}, 活动标注={len(all_activities)} sessions") + + # Phase 2: 归一化活动标签 + p("[Phase2] 归一化标签...") + all_tags = set() + for _, tasks, _ in all_activities: + for t in tasks: + if t != "不明确": + all_tags.add(t) + + all_tags = sorted(all_tags) + p(f" 原始标签数: {len(all_tags)}") + + normalize_map = {} + if all_tags: + batch_size = 150 + for i in range(0, len(all_tags), batch_size): + chunk = all_tags[i:i+batch_size] + prompt = NORMALIZE_PROMPT + "\n\n标签列表:\n" + json.dumps(chunk, ensure_ascii=False) + try: + result = fast_ask(prompt, CFG) + parsed = robust_json_parse(result) + if parsed and isinstance(parsed, dict): + normalize_map.update(parsed) + p(f" 归一化批次 {i//batch_size+1}: {len(parsed)} 条映射") + except Exception as e: + p(f" 归一化批次 {i//batch_size+1}: ERROR {e}") + + p(f" 归一化映射总数: {len(normalize_map)}") + + # Phase 3: 构建活动矩阵 + p("[Phase3] 构建矩阵...") + + # 矩阵: {normalized_task: {week: count}} + matrix = defaultdict(lambda: defaultdict(int)) + task_sessions = defaultdict(list) # {task: [(session_name, text)]} + + for session_name, tasks, session_lines in all_activities: + week = get_week(session_name) + for task in tasks: + if task == "不明确": + continue + normalized = normalize_map.get(task, task) + matrix[normalized][week] += 1 + # 收集source_lines(每个task最多保留15条) + for ln, text in session_lines[:3]: + task_sessions[normalized].append({'text': text, 'session': session_name}) + + # 收集所有周 + all_weeks = sorted(set(w for task_weeks in matrix.values() for w in task_weeks.keys())) + p(f" 任务数: {len(matrix)}, 周数: {len(all_weeks)}") + + # 输出 activity_matrix.json + matrix_output = { + 'generated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'weeks': all_weeks, + 'tasks': {} + } + for task, week_counts in sorted(matrix.items(), key=lambda x: sum(x[1].values()), reverse=True): + matrix_output['tasks'][task] = { + 'total': sum(week_counts.values()), + 'by_week': dict(week_counts), + } + + with open(MATRIX_FILE, 'w', encoding='utf-8') as f: + json.dump(matrix_output, f, ensure_ascii=False, indent=2) + p(f" activity_matrix.json 已保存 ({len(matrix_output['tasks'])} tasks)") + + # Phase 4: 判定习惯和消失 + p("[Phase4] 判定习惯/消失...") + + current_week = get_current_week() + recent_weeks = all_weeks[-4:] if len(all_weeks) >= 4 else all_weeks + + habits = [] + abandoned = [] + + for task, week_counts in matrix.items(): + total_count = sum(week_counts.values()) + active_weeks = sorted(week_counts.keys()) + span = len(active_weeks) + last_week = active_weeks[-1] if active_weeks else "" + + # 计算gap(当前周 - 最后活跃周) + try: + cur_y, cur_w = current_week.split('-W') + last_y, last_w = last_week.split('-W') + gap = (int(cur_y) - int(last_y)) * 52 + (int(cur_w) - int(last_w)) + except: + gap = 0 + + is_recent = any(w in recent_weeks for w in active_weeks) + + # source_lines: 最多15条 + sources = task_sessions.get(task, [])[:15] + + if span >= 2 and is_recent and total_count >= 3: + habits.append({ + 'task': task, + 'weeks_active': active_weeks, + 'total_count': total_count, + 'span': span, + 'source_lines': sources, + }) + elif total_count >= 3 and not is_recent: + abandoned.append({ + 'task': task, + 'weeks_active': active_weeks, + 'total_count': total_count, + 'last_week': last_week, + 'gap': gap, + }) + + # 排序 + habits.sort(key=lambda x: x['total_count'], reverse=True) + abandoned.sort(key=lambda x: x['total_count'], reverse=True) + + # 限制数量 + habits = habits[:15] + abandoned = abandoned[:30] + + elapsed = round(time.time() - t0, 1) + + # Phase 5: 输出报告 + p("[Phase5] 输出报告...") + + # 情绪排序(按行号) + all_emotions.sort(key=lambda x: x['line_no']) + + # 统计 + total_negative = sum(1 for e in all_emotions if e['label'] == 'NEGATIVE') + total_positive = sum(1 for e in all_emotions if e['label'] == 'POSITIVE') + stats = { + 'total_user_lines': total_user_lines, + 'total_negative': total_negative, + 'total_positive': total_positive, + 'detection_rate': round(len(all_emotions) / max(total_user_lines, 1) * 100, 1), + } + + scan_range = [start_line + 1, total_file_lines] if is_incremental else [1, total_file_lines] + + summary = ( + f"情绪: {len(all_emotions)}条检出 | " + f"习惯: {len(habits)}项 [{', '.join(h['task'] for h in habits[:3])}] | " + f"消失: {len(abandoned)}项 [{', '.join(a['task'] for a in abandoned[:3])}]" + ) + + report = { + 'scan_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'elapsed_seconds': elapsed, + 'emotion': { + 'count': len(all_emotions), + 'scan_range': scan_range, + 'new_lines_scanned': new_lines_count, + 'detections': all_emotions, + 'stats': stats, + }, + 'habits': { + 'count': len(habits), + 'items': habits, + }, + 'abandoned': { + 'count': len(abandoned), + 'items': abandoned, + }, + 'errors': [], + 'summary': summary, + } + + with open(REPORT_FILE, 'w', encoding='utf-8') as f: + json.dump(report, f, ensure_ascii=False, indent=2) + + # 更新状态 + state['last_scan_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + state['emotion_last_line'] = total_file_lines + state['habits_last_scan'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + state['scan_count'] = state.get('scan_count', 0) + 1 + save_state(state) + + p(f"\n{'='*60}") + p(f"完成 ({elapsed}s)") + p(f"输出: {REPORT_FILE}") + p(f"{'='*60}") + p(f"\n{summary}") + + +if __name__ == '__main__': + main() From 89ec808d78a6d73d8c5f9a9755f1c9c908a7e73d Mon Sep 17 00:00:00 2001 From: Agent Date: Wed, 20 May 2026 08:59:40 +0800 Subject: [PATCH 3/3] fix(sop): prevent matrix accumulation bug and tighten habits validation --- memory/history_insight_sop.md | 130 +++++++++++++++++++--------------- 1 file changed, 71 insertions(+), 59 deletions(-) diff --git a/memory/history_insight_sop.md b/memory/history_insight_sop.md index 58436d247..e1899139f 100644 --- a/memory/history_insight_sop.md +++ b/memory/history_insight_sop.md @@ -1,82 +1,94 @@ -# 历史洞察扫描 SOP (history_insight_sop) +# 历史洞察扫描 SOP -从 L4 历史对话中提取三类有价值信息:情绪爆发、持续习惯、消失事项。 +从历史对话中提取:情绪波动点、持续习惯、已完成/消失事项。 ## 路径 -- 数据源: `../memory/L4_raw_sessions/all_user_histories.txt` -- 产物全部在 `./`(temp目录): `batches.json`, `scan_results.json`, `normalize_map.json`, `scan_report.json`, `activity_matrix.json`, `scan_state.json` - -## 流程概览 -P0数据准备 → P1逐批提取(循环) → P2标签归一化+执行脚本 - ---- - -## P0: 数据准备 -前置依赖(若数据源不存在则按顺序生成): -1. 运行 `compress_session.py` → 生成 `all_histories.txt` -2. 从 `all_histories.txt` 过滤掉 `[Agent]:` 行 → 生成 `all_user_histories.txt` - -读 `scan_state.json` 获取 `emotion_last_line`(增量起点,首次=0)。读数据源按 `SESSION:` 分割,提取 `[USER]:` 行(忽略≤5字的),只保留行号>起点的新增行。按session为单位装入批次(每批≤120行),写 `batches.json`。格式:`[{session: "名", lines: [[全局行号, "文本"], ...]}, ...]`。 -## P1: 逐批提取(核心循环) +- 数据源: `../memory/L4_raw_sessions/all_user_histories.txt` +- 生成方式: 若不存在,先运行 `compress_session.py` → 产出 `all_histories.txt`(含Agent行) → 过滤掉 `[Agent] ` 开头的行(空格不是冒号) → 另存为 `all_user_histories.txt` +- ⚠️ `all_histories.txt` 不是数据源,必须用过滤后的 `all_user_histories.txt` +- 产物目录: `./`(temp) +- 下游: `session_traceback.py` — `traceback(query, context_chars=1500, nth=0)` 返回前后文 + +## 产物 + +| 文件 | 持久 | 格式 | +|------|------|------| +| `activity_matrix.json` | ✅ 增量累积 | `{"weeks": ["2025-W11",...], "matrix": {"标签": {"2025-W11": 3}}}` | +| `scan_state.json` | ✅ | `{"last_session": "0519_xxx"}` | +| `scan_report.json` | 每次覆盖写 | 见下方 | +| `task_dict.json` | ❌ 临时 | P2完成后删除 | +| `scan_results.json` | ❌ 临时 | P2完成后删除 | +| `batches.json` | ❌ 临时 | P2完成后删除 | + +scan_report.json 格式: +```json +{ + "scan_date": "2025-05-19", + "data_range": "0401-0519", + "total_sessions": 42, + "emotions": [ + {"session": "0501_xxx", "week": "2025-W18", "trigger": "连续3次修复失败后爆发", "expression": "用户原话前50字", "traceback_query": "用于session_traceback溯源的原文片段"} + ], + "habits": [{"label": "编写SOP", "sessions": ["0501_xxx", "0508_yyy", "0515_zzz"]}], + "abandoned": ["搭建博客", "学习Rust"] +} +``` -⛔ **严格单批处理。已验证:合并多批→后半段数据被编造。** +## 流程 -每轮固定3步,不可变形: -1. code_run:从`batches.json`读第N批(N=已有结果数),打印格式 `[行号] 文本` -2. 分析当前输出文本(禁止凭记忆补充、禁止预读下一批) -3. code_run:将本批结果追加到`scan_results.json`,打印进度`done X/total` +P0 准备 → P1 逐批扫描(循环) → P2 汇总 -读取脚本自动计算进度:`done = len(已有结果)`,只输出`batches[done]`。N批就是N轮循环,不可压缩。 +### P0 -⛔ **最后一批写入后必须打印提醒:** `print("P1完成!下一步:P2写normalize_map.json → P3执行 python ../memory/build_report.py .")` +过滤 `[Agent]` 行并按 session 分割:`sessions = re.findall(r'={5,}\nSESSION:\s*(.+?)\n={5,}\n(.*?)(?=\n={5,}\nSESSION:|\Z)', content, re.DOTALL)`(content 为去掉 `startswith("[Agent] ")` 行后的全文,注意[Agent]后是空格不是冒号)。 以 session 为单位装入批次(每批≤500 行,不拆分 session)。若 `scan_state.json` 存在,跳过 session 名 ≤ `last_session` 的所有 session,只处理之后的。若 `activity_matrix.json` 已存在,提取其标签名列表供 P1 归类参考。 +启动锚定:`update_working_checkpoint: "history_insight | P1逐批中 | P2: scan_report顶层key严格=scan_date,data_range,total_sessions,emotions,habits,abandoned(6个禁自创) | habits项格式={label,sessions[]} | 临时文件仅删task_dict/scan_results/batches | scan_state持久禁删 | 禁止: 自创阶段/自创产物/跳过P2"` -### 情绪检测(高阈值,宁漏不误) +### P1: 逐批扫描 -仅标记: -- 累积不满后的爆发(连续多轮后终于发火) -- 明确愤怒/质问/责备(真的生气,不是追问) -- 强烈讽刺挖苦(带攻击性) -- 极度惊喜感激(远超正常反应) +⛔ 每轮只处理 1 批(合并多批会超出注意力窗口,导致 session 名编造)。禁止在一个 code_run 内用循环/批量处理多批;处理完 1 批后必须结束当前 code_run,下一批在下一轮处理。 -**不标记**:普通不耐烦、催促、语气平和的不满、日常吐槽。判断技巧:去掉情绪修饰后信息量是否减少?减少才标记。 +每轮一个 code_run,固定结构:先从 batches.json 加载当前批次 session 列表 → `valid_sessions`,`batch_index = scan_state.get("last_batch", -1) + 1`,assert `batch_index < len(batches)`(未越界)。处理完毕后 assert 本轮写入的所有 session ∈ valid_sessions。每批结束时增量更新 `activity_matrix.json`(从本批 task_dict 新增条目统计计数)并更新 `scan_state.json`:`{"last_session": ..., "last_batch": batch_index}`。若本批有 emotions,assert 每条 `set(e.keys()) == {"session","week","trigger","expression","traceback_query"}`。 -label: NEGATIVE / POSITIVE。输出: `{"line_no": 行号, "text": "[USER]: 前30字", "label": "X", "reason": "一句话", "traceback_query": "前30字(无[USER]:前缀)", "occurrence_nth": 0}` +1. **情绪检测** — 标记明显的情绪波动。 -`occurrence_nth`: 该文本在全文中第几次出现(从0开始),供 `session_traceback.py` 精确溯源。大多数情况为0。 + 标记:愤怒/质问/责备、讽刺挖苦、惊喜感激、反复纠正后语气变化、沮丧/无奈表达、预期落空后的失望或方向突变。 + 不标记:纯功能性指令、语气始终平和的反馈。 -### 活动识别 -每session提取用户在做什么。标签格式:动词+宾语 4-8字(如"配置远程服务器")。每session 1-5条。不明确标 `["不明确"]`。 + 结果追加 `scan_results.json`,每批一个对象:`{"batch": N, "emotions": [{"session", "week", "trigger", "expression", "traceback_query"}]}`。traceback_query = 用户原话中最具辨识度的连续片段(15-40字,供下游精确匹配)。 -输出: `{"session": "名", "tasks": ["标签1", ...], "text": "该session中最能代表活动的一句用户原文(前30字)"}` +2. **活动归类** — 增量维护 `task_dict.json`(key=动宾短语4-10字)。匹配已有 key 则追加,否则新建。每条:`{"session", "week", "text"}`。归类原则:含相同专有名词才归同一 key;通用动作不归入特定项目;宁多建 key 不错误合并。 +3. **更新矩阵** — 用本轮新构建的 entries 增量更新 `activity_matrix.json`(标签×周 +1)。若文件不存在则新建。 + ```python + # new_entries: 本轮新构建的list,⛔禁止从task_dict.json全量遍历重新统计 + for e in new_entries: + matrix.setdefault(e["label"], {})[e["week"]] = matrix.get(e["label"], {}).get(e["week"], 0) + 1 + ``` + 写完验证:`assert set(json.load(open("activity_matrix.json")).keys()) == {"weeks", "matrix"}` -`text` 字段供 `build_report.py` 生成 `source_lines`,用于后续 `session_traceback.py` 溯源。 +P1 终止:当前批次号 == batches.json 总批次数时,**下一轮重读本 SOP P2 段落并执行**,禁止继续处理或自行总结。 -## P2: 标签归一化 +### P2: 汇总 -⛔ **不可跳过,P3脚本读 `normalize_map.json`,不存在则报错。** +⛔ P1 全部完成后必须执行以下步骤,禁止自行生成其他产物文件。 -⛔ **归一化前必须 code_run 读取已有 `activity_matrix.json` 的标签列表(若存在)。新标签优先映射到已有标签(语义一致时),保证跨次运行标签一致性。** +1. **验证矩阵**:确认 `activity_matrix.json` 已由 P1 增量生成。验证:`assert set(json.load(open("activity_matrix.json")).keys()) == {"weeks", "matrix"}` -提取所有标签去重排序,每批50个进行同义合并。规则: -- 新标签与已有matrix标签语义一致时,映射到已有标签名(优先级最高) -- 保守合并:只合并明确同义词,不确定保持独立 -- 同一功能的子步骤合并为功能级(如"X功能重构"+"X功能测试"+"X功能PR"→"X功能开发"),跨性质保持独立(开发≠文档≠部署) -- "调试bug"≠"修复bug"(动作不同) -- 输出完整映射 `{"原标签": "归一化名", ...}`,独立标签映射为自身 +2. **生成报告**:读 activity_matrix 判定 habits/abandoned,汇总 scan_results 中的情绪,写 `scan_report.json`。 + - habits — 各周计数之和≥3 且性质为「可复用技能」(周期性维护/持续使用的工具能力),⛔ 只出现1-2次 → 忽略 + - abandoned — 曾活跃但已完成/放弃的事项(具体项目开发、一次性配置/调研),只需标签名字符串 + - ⛔ habits 必须用 sessions 数量交叉验证(matrix 计数可能因 bug 虚高),`len(sessions) >= 2` 才可放入 + 顶层 key 严格为 6 个:`scan_date, data_range, total_sessions, emotions, habits, abandoned`(禁止自创 key)。 + 写完验证:`assert set(report.keys()) == {"scan_date","data_range","total_sessions","emotions","habits","abandoned"}`;`assert all(len(h["sessions"]) >= 2 for h in habits)`;若 emotions 非空:`assert set(emotions[0].keys()) == {"session","week","trigger","expression","traceback_query"}`;habits 非空:`assert set(habits[0].keys()) == {"label","sessions"}`;abandoned:`assert all(isinstance(a, str) for a in abandoned)` -⛔ **归一化完成后必须 code_run 写入 `normalize_map.json`,同一个 code_run 内紧接着执行:** -```python -import subprocess, sys -r = subprocess.run([sys.executable.replace("pythonw","python"), "../memory/build_report.py", "."], capture_output=True, text=True) -print(r.stdout); print(r.stderr) -``` -看到 `[BUILD_REPORT_DONE]` 即全部完成。未看到则读stderr排错。 - ---- +3. **收尾(缺一不可)**:单个 code_run 内完成 ① 更新 `scan_state.json`(持久产物,禁止删除) ② 删除且仅删除 `task_dict.json`、`scan_results.json`、`batches.json` 三个临时文件 ③ assert 三个临时文件不存在 + `assert os.path.exists("scan_state.json")` + `assert os.path.exists("activity_matrix.json")` ④ 结束,禁止生成其他文件 ## 坑点 -- session名格式 `MMdd_HHmm-MMdd_HHmm`,取第一个MMdd推算周次 -- `week_str_to_date` 用 `"%Y-W%W-%w"` 格式解析 -- 归一化时标签数可能300+,必须分批处理避免质量退化 -- activity_matrix结构是 `{归一化标签: {week: count}}`(按标签分组),不是 `{week: {标签: count}}`(按周分组) + +- week 从 session 名首个日期 `MMdd` 推算 `YYYY-Wnn`(今年) +- task_dict key 数量 100+ 是正常的 +- P1 禁止代码 if-elif 硬编码判断内容,必须自然语言阅读后用 code_run 写 JSON +- P2 每个产物的 json.dump 和 assert 必须在同一个 code_run 内 +- P2 emotions 只从 scan_results 汇总,禁止凭记忆补充 +- 所有产物文件名严格按本 SOP,禁止改名 +- ⛔ P1 结束后必须进入 P2(生成 scan_report + 清理临时文件),禁止跳过 P2 直接输出总结或生成其他文件