diff --git a/codewiki/__init__.py b/codewiki/__init__.py index 77f63b9a..e078a354 100644 --- a/codewiki/__init__.py +++ b/codewiki/__init__.py @@ -1,14 +1,13 @@ """ CodeWiki: Transform codebases into comprehensive documentation using AI-powered analysis. -This package provides a CLI tool for generating documentation from code repositories. +This package provides a CLI tool for generating documentation from code repositories, +and an MCP server for IDE-driven documentation generation. """ __version__ = "1.0.1" __author__ = "CodeWiki Contributors" __license__ = "MIT" -from codewiki.cli.main import cli - -__all__ = ["cli", "__version__"] +__all__ = ["__version__"] diff --git a/codewiki/cli/adapters/doc_generator.py b/codewiki/cli/adapters/doc_generator.py index 61b1c1b7..cbda31ca 100644 --- a/codewiki/cli/adapters/doc_generator.py +++ b/codewiki/cli/adapters/doc_generator.py @@ -37,7 +37,8 @@ def __init__( output_dir: Path, config: Dict[str, Any], verbose: bool = False, - generate_html: bool = False + generate_html: bool = False, + commit_id: str = None, ): """ Initialize the CLI documentation generator. @@ -48,12 +49,14 @@ def __init__( config: LLM configuration verbose: Enable verbose output generate_html: Whether to generate HTML viewer + commit_id: Git commit SHA for incremental update tracking """ self.repo_path = repo_path self.output_dir = output_dir self.config = config self.verbose = verbose self.generate_html = generate_html + self.commit_id = commit_id self.progress_tracker = ProgressTracker(total_stages=5, verbose=verbose) self.job = DocumentationJob() @@ -178,7 +181,7 @@ async def _run_backend_generation(self, backend_config: BackendConfig): self.progress_tracker.update_stage(0.2, "Initializing dependency analyzer...") # Create documentation generator - doc_generator = DocumentationGenerator(backend_config) + doc_generator = DocumentationGenerator(backend_config, commit_id=self.commit_id) if self.verbose: self.progress_tracker.update_stage(0.5, "Parsing source files...") diff --git a/codewiki/cli/commands/generate.py b/codewiki/cli/commands/generate.py index 1c370cb8..d8c9afe8 100644 --- a/codewiki/cli/commands/generate.py +++ b/codewiki/cli/commands/generate.py @@ -525,6 +525,8 @@ def generate_command( agent_instructions_dict = config.agent_instructions.to_dict() # Create generator + # Get commit_id early so it can be stored in metadata.json for --update support + commit_id = get_git_commit_hash(repo_path) generator = CLIDocumentationGenerator( repo_path=repo_path, output_dir=output_dir, @@ -545,7 +547,8 @@ def generate_command( 'max_depth': max_depth if max_depth is not None else config.max_depth, }, verbose=verbose, - generate_html=github_pages + generate_html=github_pages, + commit_id=commit_id, ) # Run generation @@ -556,7 +559,6 @@ def generate_command( # Get repository info repo_url = None - commit_hash = get_git_commit_hash(repo_path) current_branch = get_git_branch(repo_path) if is_git_repository(repo_path): diff --git a/codewiki/mcp/server.py b/codewiki/mcp/server.py index d54539ea..cc7c6ac4 100644 --- a/codewiki/mcp/server.py +++ b/codewiki/mcp/server.py @@ -1,16 +1,27 @@ """ CodeWiki MCP Server. -Exposes documentation generation as MCP tools: - - generate_docs: Generate full documentation for a repository - - analyze_repo: Analyze repository structure and dependencies - - get_module_tree: Get the module clustering for a repository +Provides two sets of tools: + +**Fine-grained tools (IDE-driven, zero LLM config):** + - ``analyze_repo`` — Parse a repo and build a dependency graph (session-based) + - ``read_code_components`` — Read source code for given component IDs + - ``view_repo_file`` — Read-only file/directory browsing + - ``write_doc_file`` — Create a documentation .md file with Mermaid validation + - ``edit_doc_file`` — Edit a documentation file (str_replace / insert / undo) + - ``save_module_tree`` — Persist IDE agent's module clustering + - ``get_processing_order`` — Get leaf-first documentation order + - ``get_prompt`` — Retrieve CodeWiki's prompt templates + - ``close_session`` — Clean up a session + +**Legacy tools (require CodeWiki LLM config):** + - ``generate_docs`` — Full documentation generation (black-box) + - ``get_module_tree`` — Retrieve existing module clustering Usage: - # Run as standalone MCP server (stdio transport) python -m codewiki.mcp.server - # Or register in your MCP client config: + # Cursor / Claude Desktop config: { "mcpServers": { "codewiki": { @@ -30,54 +41,53 @@ from mcp.server import Server from mcp.server.stdio import stdio_server -from mcp.types import ( - TextContent, - Tool, -) +from mcp.types import TextContent, Tool + +from codewiki.mcp.session import SessionStore logger = logging.getLogger(__name__) -# Create the MCP server +# --------------------------------------------------------------------------- +# Global session store (lives for the lifetime of the MCP server process) +# --------------------------------------------------------------------------- +_store = SessionStore() + +# --------------------------------------------------------------------------- +# MCP Server instance +# --------------------------------------------------------------------------- server = Server("codewiki") -def _load_config(): - """Load CodeWiki configuration from ~/.codewiki/config.json + keyring.""" - from codewiki.cli.config_manager import ConfigManager - manager = ConfigManager() - if not manager.load(): - raise RuntimeError( - "CodeWiki not configured. Run 'codewiki config set' first." - ) - return manager - +# =================================================================== +# Tool definitions +# =================================================================== -@server.list_tools() -async def list_tools() -> list[Tool]: - """List available CodeWiki MCP tools.""" +def _fine_grained_tools() -> list[Tool]: + """Return the zero-config, IDE-driven tool set.""" return [ Tool( - name="generate_docs", + name="analyze_repo", description=( - "Generate comprehensive AI-powered documentation for a code repository. " - "Analyzes dependencies, clusters modules, and generates markdown documentation." + "Analyze a code repository's structure, dependencies, and components " + "using Tree-sitter AST parsing. Returns a component index and leaf nodes. " + "No LLM required. This is the entry point for the wiki generation pipeline. " + "After calling this, use get_prompt('cluster') to learn clustering rules, " + "then save_module_tree to persist your grouping. " + "INCREMENTAL UPDATE: If docs already exist in output_dir (metadata.json + " + "module_tree.json), the response includes a 'changes' field showing which " + "files changed and which modules need updating. Use this to do targeted " + "edits instead of regenerating everything." ), inputSchema={ "type": "object", "properties": { "repo_path": { "type": "string", - "description": "Absolute path to the repository to document", + "description": "Absolute path to the repository to analyze", }, "output_dir": { "type": "string", - "description": "Output directory for generated docs (default: ./docs)", - "default": "docs", - }, - "doc_type": { - "type": "string", - "enum": ["api", "architecture", "user-guide", "developer"], - "description": "Type of documentation to generate", + "description": "Output directory for generated docs (default: /docs)", }, "include_patterns": { "type": "string", @@ -92,18 +102,242 @@ async def list_tools() -> list[Tool]: }, ), Tool( - name="analyze_repo", + name="read_code_components", + description=( + "Read the source code for a list of component IDs. " + "Component IDs have the form 'file_path::ComponentName'. " + "Returns the source code with language-aware code fences." + ), + inputSchema={ + "type": "object", + "properties": { + "session_id": { + "type": "string", + "description": "Session ID from analyze_repo", + }, + "component_ids": { + "type": "array", + "items": {"type": "string"}, + "description": "List of component IDs to read", + }, + }, + "required": ["session_id", "component_ids"], + }, + ), + Tool( + name="view_repo_file", + description=( + "Read-only view of a file or directory inside the analyzed repository. " + "Use this to explore code that isn't in the component index." + ), + inputSchema={ + "type": "object", + "properties": { + "session_id": { + "type": "string", + "description": "Session ID from analyze_repo", + }, + "path": { + "type": "string", + "description": "Relative path within the repository", + }, + "view_range": { + "type": "array", + "items": {"type": "integer"}, + "description": "Optional [start_line, end_line] (1-indexed, -1 for end)", + }, + }, + "required": ["session_id", "path"], + }, + ), + Tool( + name="write_doc_file", + description=( + "Create a new markdown documentation file in the output directory. " + "Automatically validates Mermaid diagrams after writing." + ), + inputSchema={ + "type": "object", + "properties": { + "session_id": { + "type": "string", + "description": "Session ID from analyze_repo", + }, + "filename": { + "type": "string", + "description": "Filename for the doc (e.g., 'auth_module.md')", + }, + "content": { + "type": "string", + "description": "Markdown content to write", + }, + }, + "required": ["session_id", "filename", "content"], + }, + ), + Tool( + name="edit_doc_file", + description=( + "Edit an existing documentation file. Supports str_replace (find-and-replace), " + "insert (add text at a line), and undo (revert last edit). " + "Automatically validates Mermaid diagrams after editing." + ), + inputSchema={ + "type": "object", + "properties": { + "session_id": { + "type": "string", + "description": "Session ID from analyze_repo", + }, + "filename": { + "type": "string", + "description": "Filename of the doc to edit", + }, + "command": { + "type": "string", + "enum": ["str_replace", "insert", "undo"], + "description": "Edit command to run", + }, + "old_str": { + "type": "string", + "description": "String to find (required for str_replace)", + }, + "new_str": { + "type": "string", + "description": "Replacement string (for str_replace/insert)", + }, + "insert_line": { + "type": "integer", + "description": "Line number for insert (0-indexed)", + }, + }, + "required": ["session_id", "filename", "command"], + }, + ), + Tool( + name="save_module_tree", + description=( + "Save the IDE agent's module clustering result. " + "Accepts a JSON module tree and persists it to disk. " + "Returns the recommended leaf-first processing order." + ), + inputSchema={ + "type": "object", + "properties": { + "session_id": { + "type": "string", + "description": "Session ID from analyze_repo", + }, + "module_tree": { + "type": "object", + "description": ( + "Module tree dict. Each key is a module name with value " + "{'components': [component_ids], 'children': {nested modules}}" + ), + }, + }, + "required": ["session_id", "module_tree"], + }, + ), + Tool( + name="get_processing_order", + description=( + "Get the leaf-first processing order for documentation generation. " + "Process leaf modules (is_leaf=true) before parent modules." + ), + inputSchema={ + "type": "object", + "properties": { + "session_id": { + "type": "string", + "description": "Session ID from analyze_repo", + }, + }, + "required": ["session_id"], + }, + ), + Tool( + name="get_prompt", description=( - "Analyze a repository's structure, dependencies, and component hierarchy " - "without generating full documentation. Returns file counts, languages, " - "and dependency information." + "Retrieve CodeWiki's prompt templates for each pipeline stage. " + "Available types: cluster, system_complex, system_leaf, user, " + "overview_module, overview_repo. Optionally pass variables to " + "fill in template placeholders." + ), + inputSchema={ + "type": "object", + "properties": { + "prompt_type": { + "type": "string", + "enum": [ + "cluster", + "system_complex", + "system_leaf", + "user", + "overview_module", + "overview_repo", + ], + "description": "Which prompt template to retrieve", + }, + "variables": { + "type": "object", + "description": "Optional template variables to fill in", + }, + }, + "required": ["prompt_type"], + }, + ), + Tool( + name="close_session", + description="Close and clean up an analysis session to free memory.", + inputSchema={ + "type": "object", + "properties": { + "session_id": { + "type": "string", + "description": "Session ID to close", + }, + }, + "required": ["session_id"], + }, + ), + ] + + +def _legacy_tools() -> list[Tool]: + """Return the legacy tools that require CodeWiki LLM configuration.""" + return [ + Tool( + name="generate_docs", + description=( + "[LEGACY — requires 'codewiki config set' first] " + "Generate full documentation for a repository in one shot. " + "For IDE-driven generation, use the fine-grained tools instead." ), inputSchema={ "type": "object", "properties": { "repo_path": { "type": "string", - "description": "Absolute path to the repository to analyze", + "description": "Absolute path to the repository to document", + }, + "output_dir": { + "type": "string", + "description": "Output directory for generated docs (default: ./docs)", + "default": "docs", + }, + "doc_type": { + "type": "string", + "enum": ["api", "architecture", "user-guide", "developer"], + "description": "Type of documentation to generate", + }, + "include_patterns": { + "type": "string", + "description": "Comma-separated file patterns to include", + }, + "exclude_patterns": { + "type": "string", + "description": "Comma-separated patterns to exclude", }, }, "required": ["repo_path"], @@ -111,10 +345,7 @@ async def list_tools() -> list[Tool]: ), Tool( name="get_module_tree", - description=( - "Get the module clustering tree for a repository. " - "Shows how source files are grouped into logical modules." - ), + description="Get the existing module clustering tree for a repository.", inputSchema={ "type": "object", "properties": { @@ -134,32 +365,101 @@ async def list_tools() -> list[Tool]: ] +# =================================================================== +# Tool dispatch +# =================================================================== + +@server.list_tools() +async def list_tools() -> list[Tool]: + """List all available CodeWiki MCP tools.""" + return _fine_grained_tools() + _legacy_tools() + + @server.call_tool() async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: - """Handle MCP tool calls.""" + """Route tool calls to the appropriate handler.""" try: - if name == "generate_docs": - return await _handle_generate_docs(arguments) - elif name == "analyze_repo": - return await _handle_analyze_repo(arguments) + # --- Fine-grained tools (no LLM config needed) --- + if name == "analyze_repo": + from codewiki.mcp.tools.analysis import handle_analyze_repo + return [_text(handle_analyze_repo(arguments, _store))] + + elif name == "read_code_components": + from codewiki.mcp.tools.code_reader import handle_read_code_components + return [_text(handle_read_code_components(arguments, _store))] + + elif name == "view_repo_file": + from codewiki.mcp.tools.code_reader import handle_view_repo_file + return [_text(handle_view_repo_file(arguments, _store))] + + elif name == "write_doc_file": + from codewiki.mcp.tools.doc_writer import handle_write_doc_file + result = await handle_write_doc_file(arguments, _store) + return [_text(result)] + + elif name == "edit_doc_file": + from codewiki.mcp.tools.doc_writer import handle_edit_doc_file + result = await handle_edit_doc_file(arguments, _store) + return [_text(result)] + + elif name == "save_module_tree": + from codewiki.mcp.tools.module_tree import handle_save_module_tree + return [_text(handle_save_module_tree(arguments, _store))] + + elif name == "get_processing_order": + from codewiki.mcp.tools.module_tree import handle_get_processing_order + return [_text(handle_get_processing_order(arguments, _store))] + + elif name == "get_prompt": + from codewiki.mcp.tools.prompt_server import handle_get_prompt + return [_text(handle_get_prompt(arguments, _store))] + + elif name == "close_session": + sid = arguments["session_id"] + removed = _store.remove(sid) + return [_text(json.dumps({ + "status": "closed" if removed else "not_found", + "session_id": sid, + }))] + + # --- Legacy tools (require CodeWiki LLM config) --- + elif name == "generate_docs": + return await _legacy_generate_docs(arguments) + elif name == "get_module_tree": - return await _handle_get_module_tree(arguments) + return await _legacy_get_module_tree(arguments) + else: - return [TextContent(type="text", text=f"Unknown tool: {name}")] + return [_text(json.dumps({"error": f"Unknown tool: {name}"}))] + except Exception as e: logger.error("Tool %s failed: %s", name, e, exc_info=True) - return [TextContent(type="text", text=f"Error: {e}")] + return [_text(json.dumps({"error": str(e)}))] + + +# =================================================================== +# Legacy tool handlers (require _load_config) +# =================================================================== + +def _load_config(): + """Load CodeWiki configuration from ~/.codewiki/config.json + keyring.""" + from codewiki.cli.config_manager import ConfigManager + manager = ConfigManager() + if not manager.load(): + raise RuntimeError( + "CodeWiki not configured. Run 'codewiki config set' first." + ) + return manager -async def _handle_generate_docs(arguments: dict[str, Any]) -> list[TextContent]: - """Handle generate_docs tool call.""" +async def _legacy_generate_docs(arguments: dict[str, Any]) -> list[TextContent]: + """Legacy generate_docs — requires CodeWiki LLM configuration.""" repo_path = Path(arguments["repo_path"]).expanduser().resolve() output_dir = Path(arguments.get("output_dir", "docs")).expanduser().resolve() if not repo_path.exists(): - return [TextContent(type="text", text=f"Repository not found: {repo_path}")] + return [_text(json.dumps({"error": f"Repository not found: {repo_path}"}))] - # Load config manager = _load_config() config = manager.get_config() api_key = manager.get_api_key() @@ -167,9 +467,8 @@ async def _handle_generate_docs(arguments: dict[str, Any]) -> list[TextContent]: from codewiki.src.be.backend import is_caw_provider caw_mode = bool(config) and is_caw_provider(getattr(config, "provider", "")) if not api_key and not caw_mode: - return [TextContent(type="text", text="API key not configured. Run 'codewiki config set --api-key '")] + return [_text(json.dumps({"error": "API key not configured. Run 'codewiki config set --api-key '"}))] - # Build agent instructions from arguments agent_instructions = {} if arguments.get("doc_type"): agent_instructions["doc_type"] = arguments["doc_type"] @@ -197,11 +496,8 @@ async def _handle_generate_docs(arguments: dict[str, Any]) -> list[TextContent]: from codewiki.src.be.documentation_generator import DocumentationGenerator doc_gen = DocumentationGenerator(backend_config) - - # Run generation await doc_gen.run() - # Collect results generated_files = [] for f in output_dir.iterdir(): if f.suffix in (".md", ".json", ".html"): @@ -213,74 +509,23 @@ async def _handle_generate_docs(arguments: dict[str, Any]) -> list[TextContent]: "files_generated": sorted(generated_files), "file_count": len(generated_files), } - return [TextContent(type="text", text=json.dumps(result, indent=2))] + return [_text(json.dumps(result, indent=2))] -async def _handle_analyze_repo(arguments: dict[str, Any]) -> list[TextContent]: - """Handle analyze_repo tool call — lightweight dependency analysis only.""" - repo_path = Path(arguments["repo_path"]).expanduser().resolve() - - if not repo_path.exists(): - return [TextContent(type="text", text=f"Repository not found: {repo_path}")] - - manager = _load_config() - config = manager.get_config() - api_key = manager.get_api_key() - - from codewiki.src.config import Config as BackendConfig, set_cli_context - set_cli_context(True) - - # Create a minimal backend config (no LLM calls needed for analysis) - backend_config = BackendConfig.from_cli( - repo_path=str(repo_path), - output_dir=str(repo_path / ".codewiki_temp"), - llm_base_url=config.base_url or "http://localhost", - llm_api_key=api_key or "not-needed", - main_model=config.main_model or "unused", - cluster_model=config.cluster_model or "unused", - fallback_model=config.fallback_model or "unused", - ) - - from codewiki.src.be.dependency_analyzer import DependencyGraphBuilder - graph_builder = DependencyGraphBuilder(backend_config) - components, leaf_nodes = graph_builder.build_dependency_graph() - - # Aggregate statistics - languages = {} - files = set() - for comp in components.values(): - lang = getattr(comp, "language", "unknown") - languages[lang] = languages.get(lang, 0) + 1 - files.add(getattr(comp, "relative_path", "")) - - result = { - "status": "success", - "repo_path": str(repo_path), - "total_components": len(components), - "total_files": len(files), - "leaf_nodes": len(leaf_nodes), - "languages": languages, - "sample_components": sorted(list(components.keys()))[:20], - } - return [TextContent(type="text", text=json.dumps(result, indent=2))] - - -async def _handle_get_module_tree(arguments: dict[str, Any]) -> list[TextContent]: - """Handle get_module_tree tool call — returns existing module tree.""" +async def _legacy_get_module_tree(arguments: dict[str, Any]) -> list[TextContent]: + """Legacy get_module_tree.""" repo_path = Path(arguments["repo_path"]).expanduser().resolve() output_dir = Path(arguments.get("output_dir", "docs")).expanduser().resolve() module_tree_path = output_dir / "module_tree.json" if not module_tree_path.exists(): - return [TextContent( - type="text", - text=f"Module tree not found at {module_tree_path}. Run 'codewiki generate' first." - )] + return [_text(json.dumps({ + "error": f"Module tree not found at {module_tree_path}. Run 'codewiki generate' first." + }))] module_tree = json.loads(module_tree_path.read_text()) def _summarize_tree(tree, depth=0): - """Create a readable summary of the module tree.""" lines = [] for name, info in tree.items(): indent = " " * depth @@ -299,13 +544,29 @@ def _summarize_tree(tree, depth=0): "total_modules": len(module_tree), "tree_summary": summary, } - return [TextContent(type="text", text=json.dumps(result, indent=2))] + return [_text(json.dumps(result, indent=2))] + + +# =================================================================== +# Helpers +# =================================================================== +def _text(content: str) -> TextContent: + return TextContent(type="text", text=content) + + +# =================================================================== +# Entry point +# =================================================================== async def main(): """Run the MCP server with stdio transport.""" async with stdio_server() as (read_stream, write_stream): - await server.run(read_stream, write_stream, server.create_initialization_options()) + await server.run( + read_stream, + write_stream, + server.create_initialization_options(), + ) if __name__ == "__main__": diff --git a/codewiki/mcp/session.py b/codewiki/mcp/session.py new file mode 100644 index 00000000..ca7d8f53 --- /dev/null +++ b/codewiki/mcp/session.py @@ -0,0 +1,92 @@ +"""Session state management for the CodeWiki MCP Server. + +Each ``analyze_repo`` call creates a new session that caches the analysis +results (components, leaf nodes, etc.) in memory. Subsequent tool calls +reference the session by ``session_id`` to read code, write docs, and +manage the module tree without re-parsing the repository. +""" + +from __future__ import annotations + +import time +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + +from codewiki.src.be.dependency_analyzer.models.core import Node + + +# Sessions auto-expire after this many seconds of inactivity. +_SESSION_TTL_SECONDS = 2 * 60 * 60 # 2 hours + + +@dataclass +class SessionState: + """Mutable state shared across all MCP tool calls within a session.""" + + session_id: str + repo_path: str + output_dir: str + components: Dict[str, Node] + leaf_nodes: List[str] + module_tree: Dict[str, Any] = field(default_factory=dict) + registry: Dict[str, Any] = field(default_factory=dict) + created_at: float = field(default_factory=time.time) + last_accessed: float = field(default_factory=time.time) + + def touch(self) -> None: + """Update the last-accessed timestamp.""" + self.last_accessed = time.time() + + @property + def is_expired(self) -> bool: + return (time.time() - self.last_accessed) > _SESSION_TTL_SECONDS + + +class SessionStore: + """In-memory store for all active MCP sessions.""" + + def __init__(self) -> None: + self._sessions: Dict[str, SessionState] = {} + + def create( + self, + repo_path: str, + output_dir: str, + components: Dict[str, Node], + leaf_nodes: List[str], + ) -> SessionState: + """Create a new session and return it.""" + session_id = uuid.uuid4().hex[:12] + state = SessionState( + session_id=session_id, + repo_path=repo_path, + output_dir=output_dir, + components=components, + leaf_nodes=leaf_nodes, + ) + self._sessions[session_id] = state + self._purge_expired() + return state + + def get(self, session_id: str) -> Optional[SessionState]: + """Return the session or ``None`` if not found / expired.""" + state = self._sessions.get(session_id) + if state is None: + return None + if state.is_expired: + del self._sessions[session_id] + return None + state.touch() + return state + + def remove(self, session_id: str) -> bool: + """Remove a session. Returns True if it existed.""" + return self._sessions.pop(session_id, None) is not None + + def _purge_expired(self) -> None: + """Remove all expired sessions.""" + expired = [sid for sid, s in self._sessions.items() if s.is_expired] + for sid in expired: + del self._sessions[sid] diff --git a/codewiki/mcp/tools/__init__.py b/codewiki/mcp/tools/__init__.py new file mode 100644 index 00000000..62ea687d --- /dev/null +++ b/codewiki/mcp/tools/__init__.py @@ -0,0 +1,6 @@ +"""CodeWiki MCP Tools package. + +Each module in this package implements one or more MCP tools that operate +on a :class:`~codewiki.mcp.session.SessionState`. The tools are registered +by the MCP server in ``codewiki/mcp/server.py``. +""" diff --git a/codewiki/mcp/tools/analysis.py b/codewiki/mcp/tools/analysis.py new file mode 100644 index 00000000..4e29b516 --- /dev/null +++ b/codewiki/mcp/tools/analysis.py @@ -0,0 +1,324 @@ +"""MCP tool: analyze_repo — parse a repository and build the dependency graph. + +This is the entry-point tool for the IDE-driven wiki generation pipeline. +It runs CodeWiki's Tree-sitter-based dependency analyzer (no LLM needed), +caches the results in a new session, and returns a component index the IDE +agent can use for clustering and documentation. +""" + +from __future__ import annotations + +import json +import logging +import os +import time +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from codewiki.mcp.session import SessionState, SessionStore + +logger = logging.getLogger(__name__) + + +def _build_component_index(components: Dict[str, Any], max_items: int = 500) -> Tuple[list, bool]: + """Build a lightweight component index for the MCP response. + + Returns (index_list, truncated) where *truncated* is True when the + index was capped at *max_items*. + """ + index: list[dict] = [] + for comp_id, node in list(components.items())[:max_items]: + index.append({ + "id": comp_id, + "type": getattr(node, "component_type", "unknown"), + "file": getattr(node, "relative_path", ""), + "depends_on": list(getattr(node, "depends_on", []))[:20], + }) + return index, len(components) > max_items + + +# --------------------------------------------------------------------------- +# Incremental update: detect changes since last generation +# --------------------------------------------------------------------------- + +def _detect_changes( + repo_path: Path, + output_dir: Path, +) -> Optional[Dict[str, Any]]: + """Detect changes since last documentation generation. + + Returns a changes dict with affected modules, or None if no previous + generation exists (first run). + + Detection strategy: + 1. Git-based: compare stored commit_id with current HEAD, plus check + uncommitted changes via ``git status``. + 2. Fallback: compare file mtime with stored ``timestamp`` in metadata. + """ + metadata_path = output_dir / "metadata.json" + module_tree_path = output_dir / "module_tree.json" + + if not metadata_path.exists() or not module_tree_path.exists(): + return None + + try: + metadata = json.loads(metadata_path.read_text()) + module_tree = json.loads(module_tree_path.read_text()) + except (json.JSONDecodeError, OSError): + return None + + # Try git-based detection first + changes = _detect_via_git(repo_path, metadata) + + # Fallback to mtime-based detection + if changes is None: + changes = _detect_via_mtime(repo_path, metadata) + + if changes is None: + return None + + changed_files = changes["changed_files"] + if not changed_files: + return { + "has_previous": True, + "no_changes": True, + "method": changes.get("method", "unknown"), + "message": "No changes detected since last generation. Documentation is up to date.", + } + + affected, cascade = _find_affected_modules(module_tree, changed_files) + + return { + "has_previous": True, + "no_changes": False, + "method": changes.get("method", "unknown"), + "changed_files": changed_files[:50], + "affected_modules": sorted(affected), + "cascade_modules": sorted(cascade), + "hint": ( + f"Only {len(affected)} module(s) need updating: {sorted(affected)}. " + f"Parent modules to refresh: {sorted(cascade)}. " + "Use edit_doc_file for targeted updates, write_doc_file for new modules." + ), + } + + +def _detect_via_git( + repo_path: Path, + metadata: Dict[str, Any], +) -> Optional[Dict[str, Any]]: + """Detect changes via git. Returns None if not in a git repo. + + Checks both committed changes (diff against stored commit_id) and + uncommitted changes (``git status``). + """ + try: + import git + repo = git.Repo(repo_path, search_parent_directories=True) + except Exception: + return None + + prev_commit = metadata.get("generation_info", {}).get("commit_id") + try: + current_commit = repo.head.commit.hexsha + except Exception: + return None + + changed: list[str] = [] + method = "git" + + # 1) Committed changes since last generation + if prev_commit and prev_commit != current_commit: + try: + diff_index = repo.commit(prev_commit).diff(current_commit) + seen: set[str] = set() + for diff in diff_index: + if diff.a_path and diff.a_path not in seen: + changed.append(diff.a_path) + seen.add(diff.a_path) + if diff.b_path and diff.b_path not in seen: + changed.append(diff.b_path) + seen.add(diff.b_path) + except Exception: + pass + + # 2) Uncommitted changes (user may have edited but not committed) + try: + for item in repo.untracked_files: + if item not in changed: + changed.append(item) + for file_path in [d.a_path for d in repo.index.diff(None)]: + if file_path and file_path not in changed: + changed.append(file_path) + except Exception: + pass + + return {"changed_files": changed, "method": method} + + +def _detect_via_mtime( + repo_path: Path, + metadata: Dict[str, Any], +) -> Optional[Dict[str, Any]]: + """Fallback: detect changed files by comparing mtime with generation timestamp.""" + timestamp_str = metadata.get("generation_info", {}).get("timestamp") + if not timestamp_str: + return None + + try: + from datetime import datetime + prev_time = datetime.fromisoformat(timestamp_str).timestamp() + except (ValueError, TypeError): + return None + + # Language extensions recognized by CodeWiki + source_extensions = { + ".py", ".java", ".js", ".jsx", ".ts", ".tsx", + ".c", ".h", ".cpp", ".hpp", ".cc", ".hh", + ".cs", ".kt", ".kts", + } + + changed: list[str] = [] + for dirpath, dirnames, filenames in os.walk(repo_path): + # Skip hidden dirs and common non-source dirs + dirnames[:] = [ + d for d in dirnames + if not d.startswith(".") and d not in ("node_modules", "__pycache__", "venv", ".venv") + ] + for filename in filenames: + filepath = Path(dirpath) / filename + if filepath.suffix.lower() not in source_extensions: + continue + try: + if filepath.stat().st_mtime > prev_time: + rel_path = str(filepath.relative_to(repo_path)) + changed.append(rel_path) + except OSError: + continue + + return {"changed_files": changed, "method": "mtime"} + + +def _find_affected_modules( + module_tree: Dict[str, Any], + changed_files: List[str], +) -> Tuple[set, set]: + """Map changed files to affected modules using module_tree.json. + + Uses substring matching (same as the CLI ``_invalidate_affected_modules``). + Returns (affected_modules, cascade_parent_modules). + """ + affected: set[str] = set() + cascade: set[str] = set() + + def _walk(tree: Dict, parents: list[str] | None = None): + if parents is None: + parents = [] + for mod_name, mod_info in tree.items(): + components = mod_info.get("components", []) + hit = False + for comp in components: + if any(cf in comp or comp in cf for cf in changed_files): + hit = True + break + if hit: + affected.add(mod_name) + cascade.update(parents) + + children = mod_info.get("children", {}) + if isinstance(children, dict) and children: + _walk(children, parents + [mod_name]) + + _walk(module_tree) + + # overview.md depends on all child docs, always refresh if anything changed + if affected: + cascade.add("overview") + + return affected, cascade + + +def handle_analyze_repo( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Run the dependency analysis and return the session + component index.""" + repo_path = Path(arguments["repo_path"]).expanduser().resolve() + if not repo_path.exists(): + return json.dumps({"error": f"Repository not found: {repo_path}"}) + + output_dir = Path(arguments.get("output_dir", str(repo_path / "docs"))).expanduser().resolve() + output_dir.mkdir(parents=True, exist_ok=True) + + # Build a minimal Config for the dependency analyzer (no LLM fields used) + from codewiki.src.config import Config + config = Config( + repo_path=str(repo_path), + output_dir=str(output_dir / "temp"), + dependency_graph_dir=str(output_dir / "temp" / "dependency_graphs"), + docs_dir=str(output_dir), + max_depth=2, + llm_base_url="not-needed", + llm_api_key="not-needed", + main_model="unused", + cluster_model="unused", + ) + + # Apply optional include/exclude patterns + include = arguments.get("include_patterns") + exclude = arguments.get("exclude_patterns") + if include or exclude: + agent_instructions: Dict[str, Any] = {} + if include: + agent_instructions["include_patterns"] = [p.strip() for p in include.split(",")] + if exclude: + agent_instructions["exclude_patterns"] = [p.strip() for p in exclude.split(",")] + config.agent_instructions = agent_instructions + + from codewiki.src.be.dependency_analyzer import DependencyGraphBuilder + builder = DependencyGraphBuilder(config) + components, leaf_nodes = builder.build_dependency_graph() + + session = store.create( + repo_path=str(repo_path), + output_dir=str(output_dir), + components=components, + leaf_nodes=leaf_nodes, + ) + + index, truncated = _build_component_index(components) + + # Language stats + languages: Dict[str, int] = {} + for node in components.values(): + lang = getattr(node, "language", "unknown") + languages[lang] = languages.get(lang, 0) + 1 + + # Incremental update: detect changes since last generation + changes = _detect_changes(repo_path, output_dir) + + result = { + "session_id": session.session_id, + "repo_name": repo_path.name, + "repo_path": str(repo_path), + "output_dir": str(output_dir), + "languages": languages, + "total_components": len(components), + "total_leaf_nodes": len(leaf_nodes), + "leaf_nodes": leaf_nodes[:100], + "component_index": index, + "component_index_truncated": truncated, + "changes": changes, + "hint": ( + "Use read_code_components(session_id, component_ids) to read source code. " + "Use save_module_tree(session_id, module_tree) after clustering. " + "Call get_prompt('cluster') for clustering rules." + ), + } + if changes and not changes.get("no_changes"): + result["hint"] = ( + "Incremental update detected. Only update affected modules listed in " + "'changes.affected_modules'. Use edit_doc_file for targeted updates. " + "Refresh cascade parent modules in 'changes.cascade_modules'." + ) + return json.dumps(result, indent=2, ensure_ascii=False) diff --git a/codewiki/mcp/tools/code_reader.py b/codewiki/mcp/tools/code_reader.py new file mode 100644 index 00000000..5bce49bd --- /dev/null +++ b/codewiki/mcp/tools/code_reader.py @@ -0,0 +1,107 @@ +"""MCP tools: read_code_components + view_repo_file. + +These are read-only tools that let the IDE agent explore source code +within the analyzed repository. +""" + +from __future__ import annotations + +import json +import logging +import os +import subprocess +from pathlib import Path +from typing import Any, Dict, List, Optional + +from codewiki.mcp.session import SessionState, SessionStore + +logger = logging.getLogger(__name__) + +# Truncation guard for very large responses +_MAX_RESPONSE_LEN = 32000 + + +def _maybe_truncate(text: str, limit: int = _MAX_RESPONSE_LEN) -> str: + if len(text) <= limit: + return text + return text[:limit] + "\n\n" + + +def handle_read_code_components( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Return the source code for a list of component IDs.""" + session_id = arguments["session_id"] + session = store.get(session_id) + if session is None: + return json.dumps({"error": f"Session {session_id} not found or expired."}) + + component_ids: List[str] = arguments["component_ids"] + components = session.components + + results = [] + for cid in component_ids: + node = components.get(cid) + if node is None: + results.append(f"# Component {cid} not found\n") + else: + lang = getattr(node, "language", "") + fence = lang if lang else "" + code = getattr(node, "source_code", "").strip() + results.append(f"## {cid} ({getattr(node, 'component_type', '')})\n```{fence}\n{code}\n```\n") + + output = "\n".join(results) + return _maybe_truncate(output) + + +def handle_view_repo_file( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Read-only view of a file or directory inside the repository.""" + session_id = arguments["session_id"] + session = store.get(session_id) + if session is None: + return json.dumps({"error": f"Session {session_id} not found or expired."}) + + rel_path = arguments["path"] + abs_path = Path(session.repo_path) / rel_path + + if not abs_path.exists(): + return json.dumps({"error": f"Path not found: {rel_path}"}) + + # Directory listing + if abs_path.is_dir(): + out = subprocess.run( + rf"find {abs_path} -maxdepth 2 -not -path '*/\.*'", + shell=True, + capture_output=True, + ) + listing = out.stdout.decode("utf-8", errors="replace") + listing = listing.replace(str(abs_path), rel_path) + return f"Directory listing for {rel_path}:\n{listing}" + + # File view + try: + content = abs_path.read_text(encoding="utf-8", errors="replace") + except Exception as e: + return json.dumps({"error": f"Cannot read file: {e}"}) + + view_range = arguments.get("view_range") + lines = content.split("\n") + + if view_range: + if len(view_range) != 2: + return json.dumps({"error": "view_range must be [start, end]"}) + start, end = view_range + start = max(1, min(start, len(lines))) + if end == -1: + end = len(lines) + end = max(start, min(end, len(lines))) + selected = lines[start - 1 : end] + numbered = "\n".join(f"{i + start:6}\t{line}" for i, line in enumerate(selected)) + return f"File: {rel_path} (lines {start}-{end})\n{numbered}" + + numbered = "\n".join(f"{i + 1:6}\t{line}" for i, line in enumerate(lines)) + return _maybe_truncate(f"File: {rel_path} ({len(lines)} lines)\n{numbered}") diff --git a/codewiki/mcp/tools/doc_writer.py b/codewiki/mcp/tools/doc_writer.py new file mode 100644 index 00000000..ce5f35db --- /dev/null +++ b/codewiki/mcp/tools/doc_writer.py @@ -0,0 +1,167 @@ +"""MCP tools: write_doc_file + edit_doc_file. + +These tools create and edit markdown documentation files in the output +directory, with automatic Mermaid diagram validation after every write. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +from pathlib import Path +from typing import Any, Dict, Optional + +from codewiki.mcp.session import SessionState, SessionStore + +logger = logging.getLogger(__name__) + + +async def _validate_mermaid(file_path: str, relative_path: str) -> str: + """Run Mermaid validation and return the result string.""" + try: + from codewiki.src.be.utils import validate_mermaid_diagrams + return await validate_mermaid_diagrams(file_path, relative_path) + except Exception as e: + return f"Mermaid validation skipped: {e}" + + +def _ensure_parent_dirs(path: Path) -> None: + """Create parent directories if they don't exist.""" + path.parent.mkdir(parents=True, exist_ok=True) + + +async def handle_write_doc_file( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Create a new documentation file in the output directory.""" + session_id = arguments["session_id"] + session = store.get(session_id) + if session is None: + return json.dumps({"error": f"Session {session_id} not found or expired."}) + + filename = arguments["filename"] + if not filename.endswith(".md"): + filename += ".md" + content = arguments["content"] + + doc_path = Path(session.output_dir) / filename + _ensure_parent_dirs(doc_path) + + if doc_path.exists(): + return json.dumps({ + "error": f"File already exists: {filename}. Use edit_doc_file to modify it." + }) + + doc_path.write_text(content, encoding="utf-8") + + # Mermaid validation + mermaid_result = await _validate_mermaid(str(doc_path), filename) + + result = { + "status": "created", + "path": str(doc_path), + "filename": filename, + "lines": content.count("\n") + 1, + "mermaid_validation": mermaid_result, + } + return json.dumps(result, indent=2, ensure_ascii=False) + + +async def handle_edit_doc_file( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Edit an existing documentation file (str_replace, insert, or undo).""" + session_id = arguments["session_id"] + session = store.get(session_id) + if session is None: + return json.dumps({"error": f"Session {session_id} not found or expired."}) + + filename = arguments["filename"] + if not filename.endswith(".md"): + filename += ".md" + + doc_path = Path(session.output_dir) / filename + command = arguments["command"] + + if command == "undo": + # Undo via registry history + history_key = str(doc_path) + history = session.registry.get("file_history", "{}") + file_history = json.loads(history) if isinstance(history, str) else history + path_history = file_history.get(history_key, []) + if not path_history: + return json.dumps({"error": f"No edit history found for {filename}."}) + old_content = path_history.pop() + file_history[history_key] = path_history + session.registry["file_history"] = json.dumps(file_history) + doc_path.write_text(old_content, encoding="utf-8") + return json.dumps({"status": "undone", "filename": filename}) + + if not doc_path.exists(): + return json.dumps({"error": f"File not found: {filename}. Use write_doc_file to create it."}) + + # Save current content to history before editing + current_content = doc_path.read_text(encoding="utf-8") + history_key = str(doc_path) + history = session.registry.get("file_history", "{}") + file_history = json.loads(history) if isinstance(history, str) else history + file_history.setdefault(history_key, []).append(current_content) + session.registry["file_history"] = json.dumps(file_history) + + if command == "str_replace": + old_str = arguments.get("old_str") + new_str = arguments.get("new_str", "") + if old_str is None: + return json.dumps({"error": "old_str is required for str_replace."}) + + occurrences = current_content.count(old_str) + if occurrences == 0: + return json.dumps({"error": f"old_str not found in {filename}."}) + if occurrences > 1: + return json.dumps({"error": f"old_str appears {occurrences} times in {filename}. Make it unique."}) + + new_content = current_content.replace(old_str, new_str, 1) + doc_path.write_text(new_content, encoding="utf-8") + + # Snippet around the edit + replacement_line = current_content.split(old_str)[0].count("\n") + lines = new_content.split("\n") + start = max(0, replacement_line - 4) + end = min(len(lines), replacement_line + new_str.count("\n") + 5) + snippet = "\n".join(f"{i + start + 1:6}\t{lines[i]}" for i in range(start, end)) + + elif command == "insert": + insert_line = arguments.get("insert_line", 0) + new_str = arguments.get("new_str", "") + if not new_str: + return json.dumps({"error": "new_str is required for insert."}) + + lines = current_content.split("\n") + insert_line = max(0, min(insert_line, len(lines))) + new_str_lines = new_str.split("\n") + lines = lines[:insert_line] + new_str_lines + lines[insert_line:] + new_content = "\n".join(lines) + doc_path.write_text(new_content, encoding="utf-8") + + start = max(0, insert_line - 4) + end = min(len(lines), insert_line + len(new_str_lines) + 4) + snippet = "\n".join(f"{i + start + 1:6}\t{lines[i]}" for i in range(start, end)) + + else: + return json.dumps({"error": f"Unknown command: {command}. Use str_replace, insert, or undo."}) + + # Mermaid validation + mermaid_result = await _validate_mermaid(str(doc_path), filename) + + result = { + "status": "edited", + "command": command, + "filename": filename, + "snippet": snippet, + "mermaid_validation": mermaid_result, + } + return json.dumps(result, indent=2, ensure_ascii=False) diff --git a/codewiki/mcp/tools/module_tree.py b/codewiki/mcp/tools/module_tree.py new file mode 100644 index 00000000..8d7fa3a1 --- /dev/null +++ b/codewiki/mcp/tools/module_tree.py @@ -0,0 +1,133 @@ +"""MCP tools: save_module_tree + get_processing_order. + +The IDE agent decides how to group components into modules (clustering) +using its own LLM. These tools persist that decision and compute the +leaf-first processing order for documentation generation. +""" + +from __future__ import annotations + +import json +import logging +import os +from pathlib import Path +from typing import Any, Dict, List, Tuple + +from codewiki.mcp.session import SessionState, SessionStore +from codewiki.src.config import FIRST_MODULE_TREE_FILENAME, MODULE_TREE_FILENAME + +logger = logging.getLogger(__name__) + + +def _get_processing_order(module_tree: Dict[str, Any], parent_path: List[str] = []) -> List[Dict[str, Any]]: + """Compute leaf-first processing order from a module tree. + + Returns a list of dicts with module path, name, leaf status, and + component/children info. + """ + order: List[Dict[str, Any]] = [] + + def _collect(tree: Dict[str, Any], path: List[str]) -> None: + for module_name, module_info in tree.items(): + current_path = path + [module_name] + children = module_info.get("children", {}) + has_children = isinstance(children, dict) and len(children) > 0 + + if has_children: + _collect(children, current_path) + order.append({ + "module": module_name, + "path": current_path, + "is_leaf": False, + "children": list(children.keys()), + "components": module_info.get("components", []), + }) + else: + order.append({ + "module": module_name, + "path": current_path, + "is_leaf": True, + "components": module_info.get("components", []), + }) + + _collect(module_tree, parent_path) + return order + + +def handle_save_module_tree( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Persist the IDE agent's clustering result as the module tree.""" + session_id = arguments["session_id"] + session = store.get(session_id) + if session is None: + return json.dumps({"error": f"Session {session_id} not found or expired."}) + + module_tree = arguments["module_tree"] + output_dir = session.output_dir + + # Save both immutable snapshot and mutable working copy + first_path = os.path.join(output_dir, FIRST_MODULE_TREE_FILENAME) + working_path = os.path.join(output_dir, MODULE_TREE_FILENAME) + + os.makedirs(output_dir, exist_ok=True) + + with open(first_path, "w", encoding="utf-8") as f: + json.dump(module_tree, f, indent=2, ensure_ascii=False) + with open(working_path, "w", encoding="utf-8") as f: + json.dump(module_tree, f, indent=2, ensure_ascii=False) + + # Cache in session + session.module_tree = module_tree + + # Compute processing order + order = _get_processing_order(module_tree) + + result = { + "status": "saved", + "module_count": len(module_tree), + "processing_order": order, + "tree_path": working_path, + "first_tree_path": first_path, + "hint": ( + "Use get_processing_order(session_id) to retrieve this order again. " + "Process leaf modules first (is_leaf=true), then parent modules. " + "For each leaf module: get_prompt('system_leaf') + read_code_components + write_doc_file. " + "For each parent module: get_prompt('overview_module') + write_doc_file." + ), + } + return json.dumps(result, indent=2, ensure_ascii=False) + + +def handle_get_processing_order( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Return the leaf-first processing order for the saved module tree.""" + session_id = arguments["session_id"] + session = store.get(session_id) + if session is None: + return json.dumps({"error": f"Session {session_id} not found or expired."}) + + # Try session cache first, then disk + module_tree = session.module_tree + if not module_tree: + tree_path = os.path.join(session.output_dir, MODULE_TREE_FILENAME) + if os.path.exists(tree_path): + with open(tree_path, encoding="utf-8") as f: + module_tree = json.load(f) + session.module_tree = module_tree + else: + return json.dumps({ + "error": "Module tree not found. Call save_module_tree first." + }) + + order = _get_processing_order(module_tree) + + result = { + "session_id": session_id, + "module_count": len(module_tree), + "order": order, + } + return json.dumps(result, indent=2, ensure_ascii=False) diff --git a/codewiki/mcp/tools/prompt_server.py b/codewiki/mcp/tools/prompt_server.py new file mode 100644 index 00000000..04fe2347 --- /dev/null +++ b/codewiki/mcp/tools/prompt_server.py @@ -0,0 +1,176 @@ +"""MCP tool: get_prompt — serve CodeWiki's prompt templates to the IDE agent. + +CodeWiki ships with carefully designed prompt templates for each stage of +the wiki generation pipeline. This tool lets the IDE agent retrieve them +(with optional variable substitution) so it can follow the same proven +methodology without needing its own copy of the prompts. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Dict, Optional + +from codewiki.mcp.session import SessionStore +from codewiki.src.be.prompt_template import ( + CLUSTER_REPO_PROMPT, + CLUSTER_MODULE_PROMPT, + SYSTEM_PROMPT, + LEAF_SYSTEM_PROMPT, + USER_PROMPT, + REPO_OVERVIEW_PROMPT, + MODULE_OVERVIEW_PROMPT, + format_system_prompt, + format_leaf_system_prompt, + format_cluster_prompt, + format_user_prompt, +) + +logger = logging.getLogger(__name__) + + +# Prompt catalog: maps prompt_type to (raw_template, usage_hint, variables_doc) +_PROMPT_CATALOG: Dict[str, Dict[str, str]] = { + "cluster": { + "description": "Prompt for grouping components into modules. The LLM receives a component list and returns a JSON module tree.", + "usage_hint": ( + "Use this prompt to cluster components into logical modules. " + "The response should contain JSON. " + "Pass the component list from analyze_repo's component_index." + ), + }, + "system_complex": { + "description": "System prompt for documenting a complex (multi-file, parent) module. Includes sub-module delegation instructions.", + "usage_hint": ( + "Use as the system prompt when generating docs for a parent module. " + "The agent should create {module_name}.md with architecture overview " + "and cross-references to sub-module docs." + ), + }, + "system_leaf": { + "description": "System prompt for documenting a leaf (single-file or simple) module.", + "usage_hint": ( + "Use as the system prompt when generating docs for a leaf module. " + "The agent should create {module_name}.md with detailed documentation " + "including Mermaid diagrams." + ), + }, + "user": { + "description": "User prompt template that provides the module tree and core component source code.", + "usage_hint": ( + "Use as the user/assistant prompt alongside system_leaf or system_complex. " + "It provides the module tree context and the actual source code of core components." + ), + }, + "overview_module": { + "description": "Prompt for generating a parent module overview from its children's documentation.", + "usage_hint": ( + "Use this after all child modules are documented. " + "Provide the module tree with children's docs embedded. " + "The response should be wrapped in tags." + ), + }, + "overview_repo": { + "description": "Prompt for generating the final repository overview.", + "usage_hint": ( + "Use this as the LAST step after all modules are documented. " + "Provide the full module tree with child docs. " + "Save the result as overview.md." + ), + }, +} + + +def handle_get_prompt( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Return a prompt template, optionally with variables filled in.""" + prompt_type = arguments["prompt_type"] + variables = arguments.get("variables", {}) + + if prompt_type not in _PROMPT_CATALOG: + available = list(_PROMPT_CATALOG.keys()) + return json.dumps({ + "error": f"Unknown prompt_type: {prompt_type}", + "available_types": available, + }) + + catalog_entry = _PROMPT_CATALOG[prompt_type] + + # Resolve the prompt content + content = _resolve_prompt(prompt_type, variables) + + result = { + "prompt_type": prompt_type, + "description": catalog_entry["description"], + "usage_hint": catalog_entry["usage_hint"], + "content": content, + } + return json.dumps(result, indent=2, ensure_ascii=False) + + +def _resolve_prompt(prompt_type: str, variables: Dict[str, Any]) -> str: + """Resolve a prompt template with optional variable substitution.""" + + if prompt_type == "cluster": + potential_core_components = variables.get("potential_core_components", "") + module_tree = variables.get("module_tree", {}) + module_name = variables.get("module_name", None) + return format_cluster_prompt( + potential_core_components=potential_core_components, + module_tree=module_tree, + module_name=module_name, + ) + + elif prompt_type == "system_complex": + module_name = variables.get("module_name", "MODULE_NAME") + custom_instructions = variables.get("custom_instructions", None) + return format_system_prompt(module_name, custom_instructions) + + elif prompt_type == "system_leaf": + module_name = variables.get("module_name", "MODULE_NAME") + custom_instructions = variables.get("custom_instructions", None) + return format_leaf_system_prompt(module_name, custom_instructions) + + elif prompt_type == "user": + # If full variables are provided, use the full formatter + session_id = variables.get("session_id") + module_name = variables.get("module_name", "MODULE_NAME") + core_component_ids = variables.get("core_component_ids", []) + module_tree = variables.get("module_tree", {}) + + if session_id and core_component_ids: + # Try to resolve from session + from codewiki.mcp.session import SessionStore + # We can't easily access the store here, so fall back to template + pass + + # Return the template with placeholders filled as possible + return USER_PROMPT.format( + module_name=module_name, + module_tree=json.dumps(module_tree, indent=2) if module_tree else "", + formatted_core_component_codes=variables.get( + "formatted_core_component_codes", + "" + ), + ) + + elif prompt_type == "overview_module": + module_name = variables.get("module_name", "MODULE_NAME") + repo_structure = variables.get("repo_structure", "") + return MODULE_OVERVIEW_PROMPT.format( + module_name=module_name, + repo_structure=repo_structure if isinstance(repo_structure, str) else json.dumps(repo_structure, indent=4), + ) + + elif prompt_type == "overview_repo": + repo_name = variables.get("repo_name", "REPO_NAME") + repo_structure = variables.get("repo_structure", "") + return REPO_OVERVIEW_PROMPT.format( + repo_name=repo_name, + repo_structure=repo_structure if isinstance(repo_structure, str) else json.dumps(repo_structure, indent=4), + ) + + return f"Unknown prompt type: {prompt_type}" diff --git a/pyproject.toml b/pyproject.toml index b618d572..e4126004 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,7 +101,9 @@ packages = [ "codewiki.src.be.dependency_analyzer.analyzers", "codewiki.src.be.dependency_analyzer.models", "codewiki.src.be.dependency_analyzer.utils", - "codewiki.src.fe" + "codewiki.src.fe", + "codewiki.mcp", + "codewiki.mcp.tools" ] [tool.setuptools.package-data] diff --git a/skill/codewiki-wiki-generator/SKILL.md b/skill/codewiki-wiki-generator/SKILL.md new file mode 100644 index 00000000..4a8ac2ff --- /dev/null +++ b/skill/codewiki-wiki-generator/SKILL.md @@ -0,0 +1,174 @@ +--- +name: codewiki-wiki-generator +description: "使用 CodeWiki-CN MCP 工具为代码仓库生成 Wiki 文档。当用户要求生成 Wiki、代码文档、仓库文档或分析代码库结构时使用此技能。需要已配置 CodeWiki-CN MCP 服务器。" +version: 1.0.0 +--- + +# CodeWiki 文档生成器 + +你是一位代码文档生成专家。使用 CodeWiki-CN 的 MCP 工具为代码仓库生成全面的 Wiki 文档。所有 9 个工具均**无需配置 LLM**——你提供全部智能推理能力,CodeWiki 提供工具链。 + +## 前置条件 + +开始前,确认 CodeWiki MCP 服务器可用。MCP 工具列表中应包含以下 9 个工具:`analyze_repo`、`read_code_components`、`view_repo_file`、`write_doc_file`、`edit_doc_file`、`save_module_tree`、`get_processing_order`、`get_prompt`、`close_session`。 + +如果工具不可用,请提示用户安装并配置 CodeWiki-CN: + +```bash +git clone https://github.com/mambo-wang/CodeWiki-CN.git +cd CodeWiki-CN && pip install -e . +``` + +然后在 MCP 配置中添加: + +```json +{"mcpServers":{"codewiki":{"command":"python","args":["-m","codewiki.mcp.server"],"cwd":"/path/to/CodeWiki-CN"}}} +``` + +## 五阶段工作流程 + +严格按以下顺序执行。阶段 1 之后的所有工具调用都需要 `analyze_repo` 返回的 `session_id`。 + +### 阶段 1:分析仓库 + +调用 `analyze_repo`: + +```json +{ "repo_path": "<仓库绝对路径>", "output_dir": "<仓库路径>/repowiki" } +``` + +返回内容:`session_id`、`component_index`(组件列表,含 id/type/file/depends_on)、`leaf_nodes`、`languages`。 + +**牢记 `session_id`**——后续每一步都需要它。 + +### 阶段 2:模块聚类 + +这是最需要理解力的阶段。你需要将组件分组为逻辑模块。 + +1. **获取聚类规则**:调用 `get_prompt`,参数 `{"prompt_type": "cluster"}` +2. **阅读源码**(组件超过 50 个时):分批调用 `read_code_components`,每批 15-20 个叶节点 ID,理解各组件的功能和关联 +3. **按以下原则分组**: + - 功能内聚:关系紧密的组件放入同一模块 + - 文件归属:同一文件/目录下的组件倾向归入同一模块 + - 规模控制:通常 3-8 个顶层模块,每个模块 5-30 个组件 + - 组件 ID 必须原样保留(含 `::` 前缀) +4. **保存模块树**:调用 `save_module_tree`: + +```json +{ + "session_id": "", + "module_tree": { + "模块名": { + "components": ["file.py::ClassA", "file.py::func_b"], + "children": {} + } + } +} +``` + +返回结果中包含 `processing_order`——叶优先的文档生成顺序。 + +### 阶段 3:逐模块生成文档 + +按 `processing_order` 的顺序处理各模块。**先处理叶模块**,再处理父模块。 + +**每个叶模块**(is_leaf=true): + +1. 获取系统提示词:`get_prompt` → `{"prompt_type": "system_leaf", "variables": {"module_name": "<模块名>"}}` +2. 读取源码:`read_code_components` → 该模块所有组件 ID +3. 如需更多上下文,用 `view_repo_file` 补充读取 +4. 撰写文档,包含:模块简介与核心功能、架构图(至少 1 个 Mermaid 图表)、各组件职责说明、交叉引用 `[模块名](模块名.md)` +5. 保存:`write_doc_file` → `{"session_id": "...", "filename": "<模块名>.md", "content": "..."}` + +如果 Mermaid 校验失败,修正语法后用 `edit_doc_file`(`command: "str_replace"`)修改。 + +**每个父模块**(is_leaf=false): + +1. 用 `view_repo_file` 读取所有子模块已生成的 .md 文件 +2. 获取总览提示词:`get_prompt` → `{"prompt_type": "overview_module", "variables": {"module_name": "<模块名>"}}` +3. 综合子模块文档,生成父模块总览 +4. 用 `write_doc_file` 保存 + +### 阶段 4:生成仓库总览 + +1. 获取提示词:`get_prompt` → `{"prompt_type": "overview_repo", "variables": {"repo_name": "<仓库名>"}}` +2. 用 `view_repo_file` 读取所有已生成的模块文档 +3. 撰写仓库级总览,包含:项目简介、端到端架构图(Mermaid)、各模块文档的引用链接 +4. 保存:`write_doc_file` → `filename: "overview.md"` + +### 阶段 5:清理 + +调用 `close_session` → `{"session_id": ""}` 释放内存。 + +## 增量更新模式 + +当仓库已生成过文档(`output_dir` 下存在 `metadata.json` 和 `module_tree.json`),`analyze_repo` 的返回结果会包含 `changes` 字段: + +```json +{ + "changes": { + "has_previous": true, + "no_changes": false, + "method": "git", + "changed_files": ["auth.py", "utils.py::hash_password"], + "affected_modules": ["认证模块"], + "cascade_modules": ["核心系统", "overview"] + } +} +``` + +**变更检测策略**:优先使用 `git diff`(对比 commit SHA + 检查工作区未提交变更),非 git 仓库回退到对比文件修改时间。 + +**增量更新流程**: + +1. 调用 `analyze_repo`,检查 `changes` 字段 +2. 如果 `no_changes: true`,告知用户文档已是最新,无需操作 +3. 如果 `no_changes: false`,**只更新 `affected_modules` 中列出的模块**: + - 用 `read_code_components` 读取变更组件的新源码 + - 用 `edit_doc_file`(`str_replace`)局部修改对应文档,而非整篇重写 +4. 对 `cascade_modules` 中的父模块,读取已更新的子文档后同步刷新总览 +5. 最后更新 `overview.md` + +增量更新的粒度是**模块级**——一个模块内任一组件变更,该模块文档需要更新。相比全量生成,增量更新通常只需处理 1-3 个模块。 + +## 工具速查表 + +| 工具 | 用途 | +|------|------| +| `analyze_repo` | 分析仓库,构建依赖图,返回组件索引 | +| `read_code_components` | 根据组件 ID 读取源码(格式:`文件::名称`) | +| `view_repo_file` | 只读浏览仓库文件/目录 | +| `write_doc_file` | 创建 .md 文档(自动 Mermaid 校验) | +| `edit_doc_file` | 编辑文档:`str_replace` / `insert` / `undo` | +| `save_module_tree` | 保存模块聚类结果 | +| `get_processing_order` | 获取叶优先的处理顺序 | +| `get_prompt` | 获取提示词模板:`cluster`、`system_leaf`、`system_complex`、`user`、`overview_module`、`overview_repo` | +| `close_session` | 关闭会话释放资源(2 小时自动过期) | + +## 文档质量标准 + +- **语言**:默认中文撰写(除非用户指定其他语言) +- **Mermaid 图表**:每个模块至少 1 个架构图,优先使用 `graph TD` 或 `graph LR` +- **交叉引用**:引用其他模块时使用 `[模块名](模块名.md)` 格式 +- **代码示例**:关键函数/类展示签名和简要用法 +- **篇幅**:叶模块文档 200-500 行,父模块总览 100-300 行,仓库总览 80-200 行 + +## Mermaid 语法规范 + +```mermaid +graph TD + A[组件A] --> B[组件B] + A --> C[组件C] +``` + +- 节点 ID 仅使用字母和数字(避免中文、空格、冒号) +- 节点标签用方括号包裹:`A[显示文本]` +- 子图语法:`subgraph 标题 ... end` +- 禁止使用 `click`、`linkStyle` 等交互语法 + +## 错误处理 + +- **Mermaid 校验失败**:工具会返回校验错误信息,修正语法后用 `edit_doc_file` + `str_replace` 重试 +- **会话过期**(2 小时超时):重新调用 `analyze_repo` 创建新会话 +- **大型仓库(>10 万行)**:`analyze_repo` 可能需要约 30 秒,可通过 `include_patterns`/`exclude_patterns` 缩小分析范围 +- **组件 ID 格式**:始终使用 `component_index` 中的原始 ID(如 `src/main.py::MyClass`),保留 `::` 分隔符