-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcontextforge.py
More file actions
1112 lines (995 loc) · 59.1 KB
/
contextforge.py
File metadata and controls
1112 lines (995 loc) · 59.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# =================================================================================
# File: contextforge.py
# Project: ContextForge(コンテキストフォージ)
# Version: 1.0
#
# Description: ContextForge - LLM入力アーティファクト設計ツール
# プロジェクトコードを収集・評価し、LLM向けの入力パッケージ
# (ZIP/単一ファイル/チャンク群)を生成するスタンドアロンツール。
#
# 依存ライブラリのインストール (初回のみ):
# pip install "gradio>=4.0" python-dotenv networkx requests
#
# 使用方法 (UI):
# python contextforge.py --ui
#
# 使用方法 (CLI):
# python contextforge.py --profile gemini-single-file
# python contextforge.py --profile perplexity-prepare
# =================================================================================
from __future__ import annotations
import argparse
import ast
import datetime
import fnmatch
import hashlib
import json
import os
import re
import shutil
import subprocess
import sys
import tempfile
import time
import traceback
import zipfile
from collections import Counter, defaultdict
from dataclasses import dataclass
from math import log2, isnan
from pathlib import Path
from threading import Event, Thread
from typing import Any, Dict, List, Optional, Set, Tuple, Iterator
# --- オプションライブラリの安全なインポート ---
try:
import gradio as gr
except ImportError:
gr = None
try:
import networkx as nx
except ImportError:
nx = None
try:
import requests
except ImportError:
requests = None
try:
from dotenv import load_dotenv
except ImportError:
load_dotenv = None
# ============================================================================
# 1. 設定とプロファイル
# ============================================================================
DEFAULT_EXPORTS_DIR = Path("./exports").resolve()
DEFAULT_LOGS_DIR = Path("./logs").resolve()
LONG_PATH_PREFIX = "\\\\?\\"
COMPRESSION_STATS_FILE = DEFAULT_EXPORTS_DIR / "compression_stats.json"
PERPLEXITY_API_ENDPOINT = "https://api.perplexity.ai/files"
PERPLEXITY_MAX_MB = 25.0
PERPLEXITY_MAX_FILES_PER_DAY = 100
BASE_COMPRESSION_RATIOS = defaultdict(lambda: 0.5, {
".py": 0.30, ".md": 0.35, ".txt": 0.40, ".json": 0.25,
".toml": 0.35, ".yaml": 0.40, ".yml": 0.40, ".html": 0.20,
".css": 0.25, ".js": 0.28, ".ts": 0.28, ".sh": 0.45,
})
COMMON_EXCLUDE_FILES = [
"*.exe", "*.dll", "*.so", "*.a", "*.lib", "*.o", "*.obj", "*.pyc", "*.pyd", "*.pdb",
"*.png", "*.jpg", "*.jpeg", "*.gif", "*.bmp", "*.svg", "*.webp", "*.ico",
"*.mp4", "*.mov", "*.avi", "*.mp3", "*.wav", "*.ogg", "*.flac",
"*.pdf", "*.docx", "*.pptx", "*.xlsx", "*.epub", "*.chm",
"*.ttf", "*.otf", "*.woff", "*.woff2", "*.eot",
"*.zip", "*.rar", "*.7z", "*.tar", "*.gz", "*.iso", "*.dmg",
"*.db", "*.sqlite3", "*.log", "*.jsonl", "*.DS_Store", "*.swp", "*.swo",
# 秘密漏洩防止: 環境変数・鍵等をデフォルト除外
".env", "*.env", "*.pem", "*.key", "*.p12", "*.pfx", "*id_rsa*",
]
# ★★★★★ [FIX] 除外ディレクトリのパターンから末尾の/**を削除 ★★★★★
COMMON_EXCLUDE_DIRS = [
"**/.git", "**/__pycache__", "**/node_modules",
"**/.venv", "**/venv", "**/env", "**/openenv",
"**/site-packages",
"**/exports", "**/logs", "**/.mypy_cache", "**/.pytest_cache",
"**/.idea", "**/.vscode", "**/build", "**/dist", "**/*.egg-info",
"**/typings",
]
# fill_policy="relax" 時に追加収集する拡張子(テキスト系設計資産のみ)
RELAXED_INCLUDE_EXTENSIONS = {".md", ".txt", ".toml", ".yaml", ".yml", ".json"}
PROFILES: Dict[str, Dict[str, Any]] = {
"gemini-chronicle": {
"description": "Gemini向け。Git年代記と結合コードを含む4ファイル構成のZIP。",
"target_mb": 9.5, "output_mode": "chronicle_zip", "max_single_mb": 4.0,
"exclude_globs": {"dirs": COMMON_EXCLUDE_DIRS, "files": COMMON_EXCLUDE_FILES},
"priority_files": ["**/readme.md", "**/main.py", "**/app.py", "**/orchestrator.py"],
},
"claude-chronicle-30mb": {
"description": "Claude Web UI向け(30MB制限)。大容量の年代記ZIPパッケージ。",
"target_mb": 28.5, "output_mode": "chronicle_zip", "max_single_mb": 10.0,
"fill_to_target": False, "fill_policy": "strict",
"exclude_globs": {"dirs": COMMON_EXCLUDE_DIRS, "files": COMMON_EXCLUDE_FILES},
"priority_files": [
"**/ARCHITECTURE.md", "**/PROJECT_PROFILE*.md",
"**/readme.md", "**/main.py", "**/app.py", "**/orchestrator.py",
"**/docs/**", "**/spec/**", "**/src/**",
],
},
"claude-chronicle-30mb-fill": {
"description": "Claude Web UI向け(30MB制限・実験用)。relaxで容量を埋める年代記ZIP。",
"target_mb": 28.5, "output_mode": "chronicle_zip", "max_single_mb": 10.0,
"fill_to_target": True, "fill_policy": "relax",
"exclude_globs": {"dirs": COMMON_EXCLUDE_DIRS, "files": COMMON_EXCLUDE_FILES},
"priority_files": [
"**/ARCHITECTURE.md", "**/PROJECT_PROFILE*.md",
"**/readme.md", "**/main.py", "**/app.py", "**/orchestrator.py",
"**/docs/**", "**/spec/**", "**/src/**",
],
},
"gemini-single-file": {
"description": "Gemini(ZIP非対応時)向け。全情報を単一のマークダウンファイルに結合。",
"target_mb": 9.5, "output_mode": "single_file", "max_single_mb": 4.0,
"exclude_globs": {"dirs": COMMON_EXCLUDE_DIRS, "files": COMMON_EXCLUDE_FILES},
"priority_files": ["**/readme.md", "**/main.py", "**/app.py", "**/orchestrator.py"],
},
"gpt5-zip": {
"description": "GPT-5向け。多数のファイルをそのまま格納したZIP/フォルダ(大容量構造ファイルも含む)。",
"target_mb": 80.0, "output_mode": "standard_zip", "max_single_mb": 80.0,
"exclude_globs": {"dirs": COMMON_EXCLUDE_DIRS, "files": COMMON_EXCLUDE_FILES},
"priority_files": ["**/project_structure.json", "**/project_structure.md"],
},
"perplexity-prepare": {
"description": "Perplexity Pro(手動)向け。最適化されたチャンクファイル群を準備。",
"target_mb": 8.0,
"chunk_target_mb": 2.0,
"output_mode": "perplexity_prepare",
"max_single_mb": 1.8,
"selection_mode": "raw",
"exclude_globs": {"dirs": COMMON_EXCLUDE_DIRS, "files": COMMON_EXCLUDE_FILES},
"priority_files": ["**/readme.md", "**/main.py", "**/app.py", "**/orchestrator.py"],
}
}
# ============================================================================
# 2. ユーティリティ & ヘルパー関数
# ============================================================================
class LogSink:
def __init__(self, logs_dir: Path, dry_run: bool, prefix: str = "ContextForge_build"):
logs_dir.mkdir(parents=True, exist_ok=True)
kind = "dryrun" if dry_run else "run"
self.path = logs_dir / f"{prefix}_{kind}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
self._lines: List[str] = []
def write(self, line: str, to_console: bool = True):
safe_line = re.sub(r"pplx-([a-zA-Z0-9_=\-]+)", "pplx-********************", str(line))
if to_console: print(safe_line)
self._lines.append(safe_line)
def get_full_log(self) -> str:
return "\n".join(self._lines)
def write_header(self, text: str): self.write(f"\n--- {text} ---")
def write_heavy_topN(self, heavy_list: List[Tuple[float, Path]], n: int = 10):
if not heavy_list: return
self.write_header(f"サイズ超過等により除外されたファイル (上位{n}件)")
for mb, p in sorted(heavy_list, key=lambda x: x[0], reverse=True)[:n]: self.write(f"- {mb:.2f} MB {p}")
def flush(self, summary_block: str):
self.write(summary_block)
try:
with self.path.open("w", encoding="utf-8", errors="replace") as f:
f.write(self.get_full_log())
self.write(f"✅ ログファイルが正常に保存されました: {self.path}", to_console=False)
except IOError as e: self.write(f"❌ ログファイルの書き込みに失敗しました: {self.path}\nエラー: {e}")
def load_compression_ratios(log: LogSink) -> defaultdict:
ratios = BASE_COMPRESSION_RATIOS.copy()
if COMPRESSION_STATS_FILE.exists():
try:
stats = json.loads(COMPRESSION_STATS_FILE.read_text("utf-8"))
log.write_header("過去の圧縮実績から学習した圧縮率を適用")
for ext, data in stats.items():
total_raw_bytes = data.get("total_raw_bytes", 0)
if total_raw_bytes > 1024:
learned_ratio = data["total_zip_bytes"] / total_raw_bytes
base_ratio = BASE_COMPRESSION_RATIOS[ext]
weight = min(0.95, total_raw_bytes / (10 * 1024**2))
final_ratio = (learned_ratio * weight) + (base_ratio * (1 - weight))
ratios[ext] = max(0.05, min(0.95, final_ratio))
log.write(f" - {ext}: {ratios[ext]:.2f} (実績: {learned_ratio:.2f}, 重み: {weight:.2f})", to_console=False)
log.write("...学習結果の適用完了。")
except Exception as e: log.write(f"⚠️ 圧縮統計ファイルの読み込みに失敗: {e}")
return ratios
def update_compression_stats(picked_items: List[FileItem], actual_zip_mb: float, log: LogSink):
stats = {}
if COMPRESSION_STATS_FILE.exists():
try: stats = json.loads(COMPRESSION_STATS_FILE.read_text("utf-8"))
except Exception: pass
by_ext = defaultdict(lambda: {"raw_bytes": 0})
for item in picked_items: by_ext[item.path.suffix.lower()]["raw_bytes"] += item.size_bytes
total_raw_bytes = sum(item.size_bytes for item in picked_items)
if total_raw_bytes == 0: return
overall_ratio = (actual_zip_mb * 1024**2) / total_raw_bytes
log.write_header("今回の圧縮実績を統計に記録")
for ext, data in by_ext.items():
if ext not in stats: stats[ext] = {"total_raw_bytes": 0, "total_zip_bytes": 0}
stats[ext]["total_raw_bytes"] += data["raw_bytes"]
stats[ext]["total_zip_bytes"] += int(data["raw_bytes"] * overall_ratio)
log.write(f" - {ext}: 生データ +{data['raw_bytes']/1024:.1f} KB", to_console=False)
try:
COMPRESSION_STATS_FILE.write_text(json.dumps(stats, indent=2), "utf-8")
log.write(f" - 統計ファイルを更新しました: {COMPRESSION_STATS_FILE}")
except IOError as e: log.write(f"⚠️ 圧縮統計ファイルの書き込みに失敗: {e}")
def to_win_long(path: Path) -> str:
p_str = str(path.resolve())
if os.name == "nt" and not p_str.startswith(LONG_PATH_PREFIX): return LONG_PATH_PREFIX + p_str
return p_str
def shorten_path(rel_path: Path, max_len: int = 180) -> Path:
path_str = str(rel_path).replace("\\", "/")
if len(path_str) <= max_len: return rel_path
parts = path_str.split('/'); head, tail = "/".join(parts[:2]), "/".join(parts[-2:])
mid_hash = hashlib.sha1("/".join(parts[2:-2]).encode()).hexdigest()[:8]
return Path(head) / f"__shortened_{mid_hash}__" / tail
def glob_match(path: Path, patterns: List[str]) -> bool:
# パスの各部分がパターンのいずれかに一致するかどうかをチェック
path_str = str(path)
return any(fnmatch.fnmatch(path_str, p) or any(part in path_str for part in p.split('/')) for p in patterns)
def which_glob_match(path: Path, patterns: List[str]) -> Optional[str]:
"""最初に一致したパターンを返す。一致なしなら None。"""
path_str = str(path)
for p in patterns:
if fnmatch.fnmatch(path_str, p) or any(part in path_str for part in p.split('/')):
return p
return None
def get_file_stats(p: Path) -> Tuple[int, float]:
try:
content_bytes = p.read_bytes()
loc = len(content_bytes.decode("utf-8", errors="ignore").splitlines())
byte_counts = Counter(content_bytes)
total_bytes = len(content_bytes)
entropy = 0.0
if total_bytes > 0:
entropy = -sum((count / total_bytes) * log2(count / total_bytes) for count in byte_counts.values())
return loc, entropy
except Exception: return 0, 0.0
def get_file_hash(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
while True:
chunk = f.read(8192)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
# ============================================================================
# 3. 年代記ジェネレータ
# ============================================================================
class ChronicleGenerator:
def __init__(self, root: Path):
self.root = root
self.keyword_themes = {
"Architecture & Refactoring": ["refactor", "architect", "design", "core", "module"],
"AI & Agents": ["agent", "llm", "model", "prompt", "ai", "orchestrator"],
"Features & UI": ["feature", "add", "ui", "gradio", "api", "implement"],
"Database & State": ["db", "database", "sql", "state", "manager"],
"Testing & Quality": ["test", "fix", "bug", "ci", "quality", "robust", "error"]
}
def _run_git_log(self)->List[Dict[str,Any]]:
if not (self.root / ".git").exists(): return []
try:
cmd=["git", "log", "--date=short", "--pretty=format:%H<DELIMITER>%ad<DELIMITER>%s", "--no-merges", "--since=1.year.ago"]
result=subprocess.run(cmd, cwd=self.root, capture_output=True, text=True, encoding='utf-8', errors='ignore')
if result.returncode != 0: return []
return [{"hash": p[0], "date": p[1], "subject": p[2]} for line in result.stdout.strip().split("\n") if len(p := line.split("<DELIMITER>", 2)) == 3]
except Exception: return []
def _summarize_by_week(self, commits:List[Dict[str,Any]]) -> Dict[str,List[str]]:
weekly_commits:Dict[str, List[str]] = defaultdict(list)
for commit in commits:
try:
commit_date = datetime.datetime.strptime(commit["date"], "%Y-%m-%d")
week_start = commit_date - datetime.timedelta(days=commit_date.weekday())
weekly_commits[week_start.strftime("%Y-%m-%d")].append(commit["subject"])
except ValueError: continue
return weekly_commits
def _analyze_theme(self, subjects:List[str]) -> str:
theme_counts = Counter(theme for s in subjects for theme, kws in self.keyword_themes.items() if any(kw in s.lower() for kw in kws))
return theme_counts.most_common(1)[0][0] if theme_counts else "General Updates"
def generate(self) -> str:
commits = self._run_git_log()
if not commits: return "# 📖 プロジェクト年代記\n\nGit履歴が見つかりませんでした。\n"
weekly_summary = self._summarize_by_week(commits)
if not weekly_summary: return "# 📖 プロジェクト年代記\n\n利用可能なコミット履歴がありませんでした。\n"
md = ["# 📖 プロジェクト年代記 (AI-Generated)", "\n**これは、Gitのコミット履歴を基にAIが自動生成したプロジェクトの進化の記録です。**\n"]
for week_str in sorted(weekly_summary.keys(), reverse=True)[:12]:
subjects = weekly_summary[week_str]
md.append(f"---\n### EPOCH: {datetime.datetime.strptime(week_str, '%Y-%m-%d').strftime('%Y年%m月%d日')} の週")
md.append(f"**テーマ: {self._analyze_theme(subjects)}**\n")
for subj in subjects[:3]: md.append(f"- {subj}")
if len(subjects) > 3: md.append(f"- ...他 {len(subjects) - 3} 件の改善")
md.append("")
return "\n".join(md)
# ============================================================================
# 4. ファイル収集・評価・選択
# ============================================================================
@dataclass
class FileItem:
path: Path; root: Path; rel_path: Path; size_bytes: int; loc: int; entropy: float; score: float = 0.0
def build_import_map(root: Path, py_files: List[Path]) -> Dict[Path, Set[Path]]:
module_map: Dict[str, Path] = {}
for p in py_files:
try: module_map[".".join(p.relative_to(root).with_suffix("").parts)] = p
except ValueError: continue
import_map: Dict[Path, Set[Path]] = defaultdict(set)
for pf in py_files:
try:
tree = ast.parse(pf.read_text("utf-8", errors="ignore"), filename=str(pf))
for node in ast.walk(tree):
module_name = None
if isinstance(node, ast.Import) and node.names: module_name = node.names[0].name
elif isinstance(node, ast.ImportFrom) and node.module: module_name = node.module
if module_name and (target_path := module_map.get(module_name.split(".")[0])): import_map[pf].add(target_path)
except Exception: continue
return import_map
def score_files(items: List[FileItem], log: LogSink) -> None:
py_files = [item.path for item in items if item.path.suffix == ".py"]
centrality_scores: Dict[Path, float] = {}
if nx and py_files:
log.write_header("Importグラフ解析")
import_map = build_import_map(items[0].root, py_files); g = nx.DiGraph(import_map); centrality_scores = nx.degree_centrality(g)
log.write(f" - 解析完了: {len(g.nodes)}ノード, {len(g.edges)}エッジ")
path_weights = {"src": 2, "app": 2, "core": 2}
name_weights = {"orchestrator": 10, "main": 8, "run": 8, "api": 5, "routes": 5}
ext_weights = {".py": 3, ".toml": 2, ".yaml": 2, ".md": 1, ".txt": 1}
log.write_header("ファイルスコアリング")
for item in items:
score = 0; s = str(item.rel_path).lower()
score += ext_weights.get(item.path.suffix, 0)
score += next((w for p, w in path_weights.items() if p in s), 0)
score += next((w for n, w in name_weights.items() if n in s), 0)
score += centrality_scores.get(item.path, 0) * 20
if item.loc > 0: score += min(log2(item.loc + 1), 5)
item.score = score
def collect_and_score_files(
root: Path, exclude_globs: Dict[str, List[str]], log: LogSink,
diagnose: Optional[Dict[str, Any]] = None
) -> List[FileItem]:
log.write_header(f"ファイル収集開始: {root}")
items: List[FileItem] = []
if diagnose is not None:
diagnose.setdefault("walk_total_dirs", 0)
diagnose.setdefault("walk_total_files", 0)
diagnose.setdefault("excluded_by_dir", {})
diagnose.setdefault("excluded_by_ext", {})
diagnose.setdefault("excluded_by_size", 0)
diagnose.setdefault("excluded_by_error", {})
diagnose.setdefault("included_by_ext", {})
diagnose.setdefault("included_by_dir", {})
normalized_exclude_dirs = [str(Path(p)) for p in exclude_globs["dirs"]]
for dirpath, dirnames, filenames in os.walk(root, topdown=True):
current_dir = Path(dirpath)
if diagnose is not None:
diagnose["walk_total_dirs"] += 1
new_dirs = []
for d in dirnames:
full = current_dir / d
matched = None
for pattern in normalized_exclude_dirs:
if fnmatch.fnmatch(str(full), pattern):
matched = pattern
break
if matched is not None:
if diagnose is not None:
diagnose["excluded_by_dir"][matched] = diagnose["excluded_by_dir"].get(matched, 0) + 1
else:
new_dirs.append(d)
dirnames[:] = new_dirs
for filename in filenames:
path = current_dir / filename
if diagnose is not None:
diagnose["walk_total_files"] += 1
pat = which_glob_match(path, exclude_globs["files"]) if diagnose is not None else None
if glob_match(path, exclude_globs["files"]):
if diagnose is not None and pat:
diagnose["excluded_by_ext"][pat] = diagnose["excluded_by_ext"].get(pat, 0) + 1
continue
try:
size = path.stat().st_size
if size <= 0:
if diagnose is not None:
diagnose["excluded_by_size"] += 1
continue
loc, entropy = get_file_stats(path)
item = FileItem(path=path, root=root, rel_path=path.relative_to(root), size_bytes=size, loc=loc, entropy=entropy)
items.append(item)
if diagnose is not None:
ext = (item.path.suffix or "(no ext)").lower()
diagnose["included_by_ext"][ext] = diagnose["included_by_ext"].get(ext, 0) + 1
top_dir = item.rel_path.parts[0] if len(item.rel_path.parts) > 1 else "."
diagnose["included_by_dir"][top_dir] = diagnose["included_by_dir"].get(top_dir, 0) + 1
except Exception as e:
if diagnose is not None:
name = type(e).__name__
diagnose["excluded_by_error"][name] = diagnose["excluded_by_error"].get(name, 0) + 1
continue
log.write(f" - 収集完了: {len(items)} ファイル")
score_files(items, log)
return sorted(items, key=lambda x: x.score, reverse=True)
def collect_relaxed_additional(
root: Path, exclude_globs: Dict[str, List[str]], picked_paths: Set[Path],
profile: Dict[str, Any], log: LogSink
) -> List[FileItem]:
"""fill_policy=relax 時、テキスト系設計資産(.md/.txt/.toml/.yaml/.yml/.json)を追加収集する。"""
max_single_bytes = profile["max_single_mb"] * 1024**2
priority_globs = profile.get("priority_files", [])
relaxed_dirs = [d for d in exclude_globs["dirs"] if d not in ("**/build", "**/dist")]
normalized_exclude_dirs = [str(Path(p)) for p in relaxed_dirs]
additional: List[FileItem] = []
for dirpath, dirnames, filenames in os.walk(root, topdown=True):
current_dir = Path(dirpath)
dirnames[:] = [d for d in dirnames if not any(fnmatch.fnmatch(str(current_dir / d), p) for p in normalized_exclude_dirs)]
for filename in filenames:
path = current_dir / filename
if path in picked_paths:
continue
if path.suffix.lower() not in RELAXED_INCLUDE_EXTENSIONS:
continue
if glob_match(path, exclude_globs["files"]):
continue
try:
if (size := path.stat().st_size) > max_single_bytes or size <= 0:
continue
loc, entropy = get_file_stats(path)
additional.append(FileItem(path=path, root=root, rel_path=path.relative_to(root), size_bytes=size, loc=loc, entropy=entropy))
except Exception:
continue
if additional:
priority_items = [i for i in additional if glob_match(i.rel_path, priority_globs)]
remaining_items = [i for i in additional if not glob_match(i.rel_path, priority_globs)]
return priority_items + remaining_items
return []
def select_files(
items: List[FileItem], profile: Dict[str, Any], compression_ratios: defaultdict,
root: Optional[Path] = None, exclude_globs: Optional[Dict[str, List[str]]] = None, log: Optional[LogSink] = None
) -> Tuple[List[FileItem], List[Tuple[float, Path]]]:
target_bytes = profile["target_mb"] * 1024 * 1024
max_single_bytes = profile["max_single_mb"] * 1024**2
selection_mode = profile.get("selection_mode", "compressed")
priority_globs = profile.get("priority_files", [])
picked, heavy = [], []
current_size = 0.0
priority_items = [item for item in items if glob_match(item.rel_path, priority_globs)]
remaining_items = [item for item in items if not glob_match(item.rel_path, priority_globs)]
sorted_items = priority_items + remaining_items
picked_paths = set()
for item in sorted_items:
if item.path in picked_paths:
continue
if item.size_bytes > max_single_bytes:
heavy.append((item.size_bytes / 1024**2, item.rel_path))
continue
size_to_add = 0.0
if selection_mode == 'raw':
size_to_add = item.size_bytes
else:
base_ratio = compression_ratios[item.path.suffix.lower()]
entropy_factor = 1.0 - (abs(item.entropy - 4.5) / 8.0) * 0.4
final_ratio = max(0.05, min(0.95, base_ratio * entropy_factor))
predicted_zip_bytes = item.size_bytes * final_ratio
if isnan(predicted_zip_bytes): predicted_zip_bytes = item.size_bytes
size_to_add = predicted_zip_bytes
if current_size + size_to_add > target_bytes:
heavy.append((item.size_bytes / 1024**2, item.rel_path))
continue
picked.append(item)
picked_paths.add(item.path)
current_size += size_to_add
fill_to_target = profile.get("fill_to_target", False)
fill_policy = profile.get("fill_policy", "strict")
relax_attempted = False
relax_added_count = 0
if fill_to_target and fill_policy == "relax" and root and exclude_globs and current_size < target_bytes * 0.99:
relax_attempted = True
size_before_relax = current_size
additional = collect_relaxed_additional(root, exclude_globs, picked_paths, profile, log)
for item in additional:
if item.path in picked_paths:
continue
if item.size_bytes > max_single_bytes:
continue
if selection_mode == 'raw':
size_to_add = item.size_bytes
else:
base_ratio = compression_ratios[item.path.suffix.lower()]
entropy_factor = 1.0 - (abs(item.entropy - 4.5) / 8.0) * 0.4
final_ratio = max(0.05, min(0.95, base_ratio * entropy_factor))
predicted_zip_bytes = item.size_bytes * final_ratio
if isnan(predicted_zip_bytes): predicted_zip_bytes = item.size_bytes
size_to_add = predicted_zip_bytes
if current_size + size_to_add > target_bytes:
continue
picked.append(item)
picked_paths.add(item.path)
current_size += size_to_add
relax_added_count += 1
if log and relax_attempted:
relax_delta_mb = (current_size - size_before_relax) / 1024**2
if relax_added_count > 0:
log.write(f" - fill_mode=relax: 発動。追加 {relax_added_count} 件、推定サイズ +{relax_delta_mb:.2f} MB")
else:
log.write(f" - fill_mode=relax: 発動。追加可能ファイルなし ({current_size/1024**2:.2f} MB)")
if fill_to_target and log:
if fill_policy == "strict":
log.write(f" - fill_mode=strict: 候補内で最大選択。目標未達の場合はそのまま終了 ({current_size/1024**2:.2f} MB)")
elif fill_policy == "relax" and not relax_attempted:
log.write(f" - fill_mode=relax: strictで目標達成のため発動せず ({current_size/1024**2:.2f} MB)")
return picked, heavy
# ============================================================================
# 5. 出力ファイル生成 & Perplexityアップロード
# ============================================================================
def create_report_md(
items: List[FileItem],
profile_name: str,
total_predicted_zip_bytes: float,
actual_zip_mb: Optional[float],
heavy: Optional[List[Tuple[float, Path]]] = None,
) -> str:
stats = defaultdict(lambda: {"files": 0, "lines": 0})
for item in items:
ext_raw = (item.path.suffix or "NoExt").strip() or "NoExt"
ext = ext_raw.split()[0] # drop trailing clipboard artifacts等
stats[ext]["files"] += 1
stats[ext]["lines"] += item.loc
stats["total"] = {"files": len(items), "lines": sum(s["lines"] for s in stats.values())}
stats_table = "|拡張子|ファイル数|コード行数|\n|---|---|---|\n" + "\n".join(f"|`{e}`|{d['files']:,}|{d['lines']:,}|" for e, d in sorted(stats.items(), key=lambda x: x[1]['files'], reverse=True))
report_lines = ["\n## 4. パッケージ品質レポート (自己診断)"]
good_exts = {".py", ".md", ".txt", ".json", ".toml", ".yaml", ".yml", ".ini", ".cfg", ".sh", ".bat"}
good_files = sum(1 for item in items if item.path.suffix.lower() in good_exts)
total_files = len(items) if items else 1
purity_score = (good_files / total_files) * 100
report_lines.append(f"### コード純度: {purity_score:.1f}%")
if purity_score < 80: report_lines.append("- ⚠️ **警告**: LLM入力に不要なファイルが多数含まれています。除外ルールを見直してください。")
else: report_lines.append("- ✅ **評価**: パッケージは高品質なソースコードとドキュメントで構成されています。")
if actual_zip_mb is not None:
info_density = (sum(i.size_bytes for i in items) / 1024**2) / actual_zip_mb if actual_zip_mb > 0 else 0
report_lines.append(f"### 情報密度: {info_density:.2f} (Raw/Zip Ratio)")
if info_density < 3.0: report_lines.append("- 🟡 **情報**: 圧縮率の低いファイルが含まれている可能性があります。")
else: report_lines.append("- ✅ **評価**: 効率的に圧縮されており、多くの情報が含まれています。")
pred_accuracy = (1 - abs(actual_zip_mb - total_predicted_zip_bytes / 1024**2) / (total_predicted_zip_bytes / 1024**2)) * 100 if total_predicted_zip_bytes > 0 else 100
report_lines.append(f"### 予測精度: {pred_accuracy:.1f}%")
if pred_accuracy < 80: report_lines.append("- 🟡 **情報**: 予測と実際のZIPサイズに乖離があります。自己学習機能により次回以降精度が向上します。")
else: report_lines.append("- ✅ **評価**: 圧縮サイズの予測は非常に正確です。")
excluded_lines = []
if heavy:
heavy_sorted = sorted(heavy, key=lambda h: h[0], reverse=True)
excluded_lines.append("\n## 3. 除外された大容量ファイル (Top 10)")
for sz_mb, rel in heavy_sorted[:10]:
excluded_lines.append(f"- {sz_mb:.2f} MB\t{rel}")
if len(heavy_sorted) > 10:
excluded_lines.append(f"- ...他 {len(heavy_sorted) - 10} 件")
bootstrap_prompt = f"""
---
## 5. 推奨ブートストラップ・プロンプト (AIへの最初の指示)
あなたはシニアソフトウェアアーキテクトです。添付されたプロジェクトコンテキスト(パッケージ)を利用し、その概要を報告してください。
**入力アーティファクトの形式**: `{profile_name}`
**利用ステップ**:
1. **歴史の理解 (`PROJECT_CHRONICLE.md`)**: プロジェクトの進化の歴史と主要な開発テーマを把握してください。
2. **定量的データの確認 (`PROJECT_INFO.md` or `MANIFEST_REPORT.md`)**: プロジェクトの規模(ファイル数、コード行数)と品質(コード純度、情報密度など)を確認してください。
3. **ソースコードの確認 (`COMBINED_CODE.py` or `*.zip`)**: ソースコード全体をレビューし、主要なエントリーポイント、設計思想、外部依存関係を特定してください。
4. **総合報告**: 上記を統合し、このプロジェクトが**何をするためのもので、どのような技術的特徴を持っているか**を簡潔に要約してください。
"""
report_title = "プロジェクト情報" if "gemini" in profile_name else "マニフェストレポート"
excluded_block = "\n".join(excluded_lines) if excluded_lines else ""
return (
f"# 📦 {report_title}\n"
f"## 1. 概要\n- 総ファイル数: {stats['total']['files']:,}\n- 総コード行数: {stats['total']['lines']:,}\n"
f"## 2. 統計\n{stats_table}\n"
f"{excluded_block}\n"
f"{'\n'.join(report_lines)}"
f"{bootstrap_prompt}"
)
def create_combined_code(items: List[FileItem], log: LogSink) -> str:
log.write_header("結合コードを生成中")
code_lines = [f"# === COMBINED SOURCE CODE ({len(items)} files) ==="]
for item in items:
try:
content = item.path.read_text("utf-8", errors="ignore")
code_lines.extend([f"\n# {'='*20} START OF: {item.rel_path} {'='*20}", content, f"# {'='*22} END OF: {item.rel_path} {'='*22}"])
except Exception as e: code_lines.append(f"# ERROR reading {item.rel_path}: {e}")
log.write(" - 生成完了")
return "\n".join(code_lines)
def create_chunked_code_files(items: List[FileItem], output_dir: Path, chunk_target_mb: float, log: LogSink) -> List[Path]:
log.write_header("結合コードをチャンク分割中...")
chunk_target_bytes = chunk_target_mb * 1024 * 1024
chunk_num = 1
current_chunk_content = []
current_chunk_bytes = 0
chunk_paths = []
header = f"# === COMBINED SOURCE CODE ({len(items)} files) - PART {chunk_num} ===\n"
current_chunk_content.append(header)
current_chunk_bytes += len(header.encode('utf-8'))
for item in items:
try:
content = item.path.read_text("utf-8", errors="ignore")
file_header = f"\n# {'='*20} START OF: {item.rel_path} {'='*20}\n"
file_footer = f"\n# {'='*22} END OF: {item.rel_path} {'='*22}\n"
content_bytes = (file_header + content + file_footer).encode('utf-8')
if current_chunk_bytes + len(content_bytes) > chunk_target_bytes and current_chunk_bytes > len(header.encode('utf-8')):
chunk_path = output_dir / f"COMBINED_CODE_{chunk_num}.txt"
chunk_path.write_text("".join(current_chunk_content), encoding="utf-8")
log.write(f" - チャンク {chunk_num} を保存しました: {chunk_path.name} ({current_chunk_bytes / 1024**2:.2f} MB)")
chunk_paths.append(chunk_path)
chunk_num += 1
header = f"# === COMBINED SOURCE CODE ({len(items)} files) - PART {chunk_num} ===\n"
current_chunk_content = [header]
current_chunk_bytes = len(header.encode('utf-8'))
current_chunk_content.append(file_header + content + file_footer)
current_chunk_bytes += len(content_bytes)
except Exception as e:
error_line = f"# ERROR reading {item.rel_path}: {e}"
current_chunk_content.append(error_line)
current_chunk_bytes += len(error_line.encode('utf-8'))
if current_chunk_bytes > len(header.encode('utf-8')):
chunk_path = output_dir / f"COMBINED_CODE_{chunk_num}.txt"
chunk_path.write_text("".join(current_chunk_content), encoding="utf-8")
log.write(f" - チャンク {chunk_num} を保存しました: {chunk_path.name} ({current_chunk_bytes / 1024**2:.2f} MB)")
chunk_paths.append(chunk_path)
log.write(" - チャンク分割完了。")
return chunk_paths
# ============================================================================
# 6. ガーディアンモード & メインロジック
# ============================================================================
class GuardianWatcher(Thread):
def __init__(self, root: str, watch_interval: int, commit_trigger: int, stop_event: Event):
super().__init__(daemon=True); self.root = Path(root); self.watch_interval = watch_interval; self.commit_trigger = commit_trigger; self.stop_event = stop_event; self.log = LogSink(DEFAULT_LOGS_DIR, dry_run=False, prefix="ContextForge_guardian")
def _get_last_commit(self) -> Optional[str]:
if not (self.root / ".git").exists(): return None
try:
cmd = ["git", "rev-parse", "HEAD"]; result = subprocess.run(cmd, cwd=self.root, capture_output=True, text=True, check=True); return result.stdout.strip()
except Exception: return None
def run(self):
self.log.write(f"🛡️ プロジェクト・ガーディアンモード起動 (監視間隔: {self.watch_interval}秒)")
last_known_commit = self._get_last_commit(); commit_count_since_last_run = 0
while not self.stop_event.is_set():
time.sleep(self.watch_interval)
current_commit = self._get_last_commit()
if not current_commit: self.log.write("...Gitリポジトリが見つかりません。監視を一時停止します。", to_console=False); continue
if current_commit != last_known_commit:
self.log.write(f"新しいコミットを検知: {current_commit[:7]}"); last_known_commit = current_commit; commit_count_since_last_run += 1
if commit_count_since_last_run >= self.commit_trigger:
self.log.write_header(f"{self.commit_trigger}回のコミットを検知。アーティファクトの自動生成を開始します...")
try:
for _ in export_main_generator(str(self.root), "gemini-chronicle", False, False, False, DEFAULT_EXPORTS_DIR, DEFAULT_LOGS_DIR, self.stop_event):
pass
self.log.write_header("自動生成完了"); commit_count_since_last_run = 0
except Exception as e: self.log.write(f"❌ 自動生成中にエラーが発生: {e}")
else: self.log.write(f"...次の自動実行まであと {self.commit_trigger - commit_count_since_last_run} コミット")
self.log.write("🛡️ ガーディアンモードが停止しました。")
def write_diagnose_report(diagnose: Dict[str, Any], report_path: Path, console_top_n: int = 10, file_top_n: int = 50) -> None:
"""診断レポートをファイルに書き、コンソールに概要(上位N件)を出力する。"""
def _top(d: Dict[str, Any], n: int) -> List[Tuple[str, Any]]:
return sorted(d.items(), key=lambda x: x[1], reverse=True)[:n]
lines = ["# ContextForge 診断レポート", ""]
lines.append(f"- **walk_total_dirs**: {diagnose.get('walk_total_dirs', 0):,}")
lines.append(f"- **walk_total_files**: {diagnose.get('walk_total_files', 0):,}")
lines.append("")
for section, key, title in [
("excluded_by_dir", "excluded_by_dir", "除外(ディレクトリパターン)"),
("excluded_by_ext", "excluded_by_ext", "除外(ファイルglob)"),
("excluded_by_size", None, "除外(サイズ0)"),
("excluded_by_error", "excluded_by_error", "除外(例外)"),
("included_by_ext", "included_by_ext", "採用(拡張子)"),
("included_by_dir", "included_by_dir", "採用(トップディレクトリ)"),
]:
if key is None:
lines.append(f"## {title}\n\n{diagnose.get('excluded_by_size', 0):,}\n")
continue
data = diagnose.get(key, {})
if not data:
lines.append(f"## {title}\n\n(なし)\n")
continue
lines.append(f"## {title}\n")
lines.append("| 項目 | 件数 |")
lines.append("|---|---|")
for k, v in _top(data, file_top_n):
lines.append(f"| `{k}` | {v:,} |")
if len(data) > file_top_n:
lines.append(f"| ...他 {len(data) - file_top_n} 件 | |")
lines.append("")
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text("\n".join(lines), encoding="utf-8")
print("\n--- 診断概要(上位{}件) ---".format(console_top_n))
print(f"walk_total_dirs: {diagnose.get('walk_total_dirs', 0):,} | walk_total_files: {diagnose.get('walk_total_files', 0):,}")
print(f"excluded_by_size: {diagnose.get('excluded_by_size', 0):,}")
for key, title in [("excluded_by_dir", "excluded_by_dir"), ("excluded_by_ext", "excluded_by_ext"), ("excluded_by_error", "excluded_by_error"), ("included_by_ext", "included_by_ext"), ("included_by_dir", "included_by_dir")]:
data = diagnose.get(key, {})
if data:
print(f"{title} (top {console_top_n}): {dict(_top(data, console_top_n))}")
print(f"レポート: {report_path}\n")
def export_main_generator(
root_str: str, profile_name: str, emit_zip: bool, emit_folder: bool, dry_run: bool,
exports_dir: Path, logs_dir: Path, stop_event: Event,
api_token: Optional[str] = None,
progress: Optional[gr.Progress] = None,
diagnose: bool = False
) -> Iterator[str]:
log = LogSink(logs_dir, dry_run)
try:
if not (root := Path(root_str)).is_dir():
log.write(f"❌ エラー: ルートはディレクトリではありません: {root_str}")
yield log.get_full_log(); return
if progress: progress(0, desc="設定読み込み...")
profile = PROFILES[profile_name]
compression_ratios = load_compression_ratios(log); yield log.get_full_log()
if progress: progress(0.1, desc="ファイル収集とスコアリング...")
diagnose_dict: Optional[Dict[str, Any]] = {} if diagnose else None
items = collect_and_score_files(root, profile["exclude_globs"], log, diagnose=diagnose_dict); yield log.get_full_log()
if diagnose and diagnose_dict is not None:
ts = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
root_label = root.resolve().name or "cwd"
report_path = exports_dir / f"DIAGNOSE_{root_label}_{profile_name}_{ts}.md"
write_diagnose_report(diagnose_dict, report_path, console_top_n=10, file_top_n=50)
if stop_event.is_set(): log.write("⏹️ キャンセルされました"); yield log.get_full_log(); return
if progress: progress(0.3, desc="ファイル選択...")
picked_items, heavy = select_files(
items, profile, compression_ratios,
root=root, exclude_globs=profile.get("exclude_globs"), log=log
)
total_raw_bytes = sum(item.size_bytes for item in picked_items)
predicted_zip_bytes_sum = 0.0
for item in picked_items:
base_ratio = compression_ratios[item.path.suffix.lower()]
entropy_factor = 1.0 - (abs(item.entropy - 4.5) / 8.0) * 0.4
final_ratio = max(0.05, min(0.95, base_ratio * entropy_factor))
predicted_zip_bytes = item.size_bytes * final_ratio
if isnan(predicted_zip_bytes): predicted_zip_bytes = item.size_bytes
predicted_zip_bytes_sum += predicted_zip_bytes
log.write_header(f"ファイル選択結果: {len(picked_items)}件 / 生データ合計 {total_raw_bytes/1024**2:.2f} MB")
for item in picked_items[:10]: log.write(f" - (Top) {item.rel_path} (score: {item.score:.2f})")
if len(picked_items) > 10: log.write(f" - ...他 {len(picked_items) - 10}件")
log.write_heavy_topN(heavy)
yield log.get_full_log()
if stop_event.is_set(): log.write("⏹️ キャンセルされました"); yield log.get_full_log(); return
if dry_run:
log.flush(f"👁🗨 ドライラン完了。{len(picked_items)}個のファイルが選択されました。")
if progress: progress(1.0, desc="完了!")
yield log.get_full_log(); return
ts = datetime.datetime.now().strftime('%Y%m%d_%H%M%S'); output_prefix = f"{root.name}_{profile_name.replace(' ', '-')}_{ts}"; summary = ""; zip_path = None
if profile["output_mode"] == "perplexity_prepare":
output_dir = exports_dir / output_prefix
output_dir.mkdir(parents=True, exist_ok=True)
log.write(f"ローカル保存先: {output_dir}")
if progress: progress(0.5, desc="コンテンツ生成...")
log.write_header("Perplexity用コンテンツを生成・保存中..."); yield log.get_full_log()
chronicle_content = ChronicleGenerator(root).generate()
chronicle_path = output_dir / "PROJECT_CHRONICLE.txt"
chronicle_path.write_text(chronicle_content, encoding="utf-8")
chunk_target_mb = profile.get("chunk_target_mb")
if chunk_target_mb:
code_files_to_upload = create_chunked_code_files(picked_items, output_dir, chunk_target_mb, log)
else:
combined_code_content = create_combined_code(picked_items, log)
code_path = output_dir / "COMBINED_CODE.txt"
code_path.write_text(combined_code_content, encoding="utf-8")
code_files_to_upload = [code_path]
yield log.get_full_log()
files_to_upload = [chronicle_path] + code_files_to_upload
summary = (
f"✅ Perplexity用ファイルの準備が完了しました。\n"
f" 以下のファイルをPerplexityのWebサイトから手動でアップロードしてください:\n"
f" フォルダ: {output_dir}\n"
f" ファイル: {[p.name for p in files_to_upload]}"
)
elif profile["output_mode"] == "single_file":
if progress: progress(0.6, desc="コンポーネント生成...")
chronicle_md = ChronicleGenerator(root).generate()
combined_code_py = create_combined_code(picked_items, log); yield log.get_full_log()
report_md = create_report_md(picked_items, profile_name, predicted_zip_bytes_sum, None, heavy)
if progress: progress(0.8, desc="単一ファイルに結合中...")
final_md_content = "\n\n---\n\n".join([
f"# プロジェクトコンテキストレポート: {root.name}",
"## 1. プロジェクト年代記",
chronicle_md.replace("# 📖 プロジェクト年代記 (AI-Generated)", ""),
"## 2. プロジェクト情報",
report_md.replace(f"# 📦 プロジェクト情報", ""),
"## 3. 結合ソースコード",
f"```python\n{combined_code_py}\n```"
])
output_path = exports_dir / f"{output_prefix}.md"
output_path.write_text(final_md_content, encoding="utf-8")
summary = f"✅ シングルファイルレポートの生成が完了しました: {output_path}"
elif profile["output_mode"] == "chronicle_zip":
if progress: progress(0.6, desc="年代記・結合コード生成...")
chronicle_md = ChronicleGenerator(root).generate()
combined_code_py = create_combined_code(picked_items, log); yield log.get_full_log()
readme_md = f"# {root.name} - LLM Input Package"
zip_path = exports_dir / f"{output_prefix}.zip"
# Ensure exports directory exists (first-run safety)
zip_path.parent.mkdir(parents=True, exist_ok=True)
if progress: progress(0.8, desc="ZIPアーカイブ作成...")
log.write_header(f"ZIPアーカイブ作成: {zip_path}"); yield log.get_full_log()
with zipfile.ZipFile(to_win_long(zip_path), "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr("PROJECT_CHRONICLE.md", chronicle_md)
zf.writestr("COMBINED_CODE.py", combined_code_py)
zf.writestr("README.md", readme_md)
actual_zip_mb_pre = zip_path.stat().st_size / 1024**2
report_md = create_report_md(picked_items, profile_name, predicted_zip_bytes_sum, actual_zip_mb_pre, heavy)
with zipfile.ZipFile(to_win_long(zip_path), "a", zipfile.ZIP_DEFLATED) as zf:
zf.writestr("PROJECT_INFO.md", report_md)
summary = f"✅ 年代記パッケージの生成が完了しました: {zip_path}"
elif profile["output_mode"] == "standard_zip":
manifest_dir = exports_dir / output_prefix
if emit_folder:
if progress: progress(0.5, desc="マニフェストフォルダにコピー中...")
manifest_dir.mkdir(parents=True, exist_ok=True)
log.write_header("マニフェストフォルダにコピー中..."); yield log.get_full_log()
for i, item in enumerate(picked_items):
dest = manifest_dir / shorten_path(item.rel_path); dest.parent.mkdir(parents=True, exist_ok=True); shutil.copy2(to_win_long(item.path), to_win_long(dest))
if progress and i % 50 == 0: progress(0.5 + 0.3 * (i / len(picked_items)))
if emit_zip:
if progress: progress(0.8, desc="ZIPアーカイブ準備...")
zip_path = exports_dir / f"{output_prefix}.zip"
target_dir = manifest_dir if emit_folder else exports_dir / f"_temp_{output_prefix}"
if not emit_folder:
target_dir.mkdir(parents=True, exist_ok=True)
for item in picked_items:
dest = target_dir / shorten_path(item.rel_path); dest.parent.mkdir(parents=True, exist_ok=True); shutil.copy2(to_win_long(item.path), to_win_long(dest))
report_md = create_report_md(picked_items, profile_name, predicted_zip_bytes_sum, None, heavy)
(target_dir / "MANIFEST_REPORT.md").write_text(report_md, encoding="utf-8")
if progress: progress(0.9, desc="ZIPアーカイブ作成中...")
log.write_header("ZIPアーカイブ作成中..."); yield log.get_full_log()
shutil.make_archive(str(zip_path.with_suffix('')), 'zip', str(target_dir))
if not emit_folder: shutil.rmtree(target_dir)
summary = "✅ 標準ZIPパッケージの生成が完了しました。"
if zip_path and zip_path.exists():
actual_zip_mb = zip_path.stat().st_size / 1024**2
pred_acc = (1 - abs(actual_zip_mb - predicted_zip_bytes_sum / 1024**2) / (predicted_zip_bytes_sum / 1024**2)) * 100 if predicted_zip_bytes_sum > 0 else 100
summary += f"\n - 予測ZIP: {predicted_zip_bytes_sum/1024**2:.2f} MB / 実際ZIP: {actual_zip_mb:.2f} MB (予測精度: {pred_acc:.1f}%)"
update_compression_stats(picked_items, actual_zip_mb, log)
if progress: progress(1.0, desc="完了!")
log.flush(summary)
yield log.get_full_log()
except Exception:
tb_str = traceback.format_exc()
log.write(f"❌ 致命的なエラー:\n{tb_str}")
log.flush("エラーにより処理が中断されました。")
if progress: progress(1.0, desc="エラー発生")
yield log.get_full_log()
# ============================================================================
# 7. Purge exports(危険操作)
# ============================================================================
def purge_exports(exports_dir: Path, keep_last: int = 0) -> None:
"""exports_dir 直下の削除対象(*.zip, *.md ただし DIAGNOSE_*.md 除外)を削除する。
compression_stats.json と DIAGNOSE_*.md は常に除外。keep_last > 0 のとき最新 N 件を残す。"""
if not exports_dir.is_dir():
return
deletable: List[Path] = []
for p in exports_dir.iterdir():
if not p.is_file():
continue
if p.name == "compression_stats.json":
continue
if fnmatch.fnmatch(p.name, "DIAGNOSE_*.md"):
continue
if p.suffix.lower() == ".zip" or (p.suffix.lower() == ".md" and not fnmatch.fnmatch(p.name, "DIAGNOSE_*.md")):
deletable.append(p)
if not deletable:
print("purge-exports: 削除対象 0 件")
return
deletable.sort(key=lambda x: x.stat().st_mtime, reverse=True)
if keep_last > 0:
to_delete = deletable[keep_last:]
else:
to_delete = deletable
if not to_delete:
print(f"purge-exports: keep-last={keep_last} のため削除対象 0 件")
return
print(f"purge-exports: 削除対象 {len(to_delete)} 件(keep-last={keep_last})")
for f in to_delete:
print(f" - {f.name}")
for f in to_delete:
try:
f.unlink()
except OSError as e:
print(f" ❌ 削除失敗: {f.name} - {e}")
print("purge-exports: 削除完了")
# ============================================================================
# 8. UI & CLI
# ============================================================================
def create_gradio_interface(initial_api_token: Optional[str] = None):
if not gr: return print("Gradio未インストール。`pip install gradio`でUIが有効になります。")
stop_event = Event()
with gr.Blocks(title="ContextForge", theme=gr.themes.Soft()) as demo:
gr.Markdown("# ContextForge - LLM入力アーティファクト設計ツール")
with gr.Row():
root_tb = gr.Textbox(label="📁 プロジェクトルート", value=str(Path.cwd()))
profile_dd = gr.Dropdown(choices=[(p["description"], p_name) for p_name, p in PROFILES.items()], value="gemini-chronicle", label="📜 プロファイル")
profile_info = gr.Markdown(value=PROFILES["gemini-chronicle"]["description"])
with gr.Accordion("詳細設定", open=False, visible=False) as accordion:
loaded_from_env = bool(initial_api_token)
placeholder = "✅ .envまたは環境変数から読込済" if loaded_from_env else "pplx-..."
api_token_tb = gr.Textbox(label="🔑 Perplexity APIトークン (任意)", type="password", placeholder=placeholder, value="", visible=False)