- Overview
- API Layer Components
- Service Layer Components
- Storage Components
- Utility Components
- MCP Server Components
- Component Dependencies
The Code Graph Knowledge System consists of multiple specialized components organized in layers. This document provides detailed descriptions of each component, their responsibilities, interfaces, and interactions.
graph TB
subgraph "Two-Port Architecture"
subgraph "Port 8000 (PRIMARY)"
MCP[MCP SSE Server]
MCPRoutes[SSE Routes]
end
subgraph "Port 8080 (SECONDARY)"
FastAPI[FastAPI Application]
Routes[REST API Routes]
Middleware[Middleware Stack]
WebUI[Web UI - React SPA]
end
end
subgraph "Service Layer"
KnowServ[Knowledge Service]
MemServ[Memory Store]
GraphServ[Graph Service]
TaskQ[Task Queue]
CodeIng[Code Ingestor]
MemExt[Memory Extractor]
end
subgraph "Supporting Services"
SQLParse[SQL Parser]
Ranker[Result Ranker]
PackBuild[Context Pack Builder]
GitUtil[Git Utilities]
Metrics[Metrics Service]
end
subgraph "Storage"
Neo4j[(Neo4j)]
SQLite[(SQLite)]
FileSystem[File System]
end
MCP --> MCPRoutes
MCPRoutes --> KnowServ
MCPRoutes --> MemServ
MCPRoutes --> GraphServ
FastAPI --> Routes
FastAPI --> Middleware
FastAPI --> WebUI
Routes --> KnowServ
Routes --> MemServ
Routes --> GraphServ
Routes --> TaskQ
KnowServ --> Neo4j
MemServ --> Neo4j
GraphServ --> Neo4j
TaskQ --> SQLite
CodeIng --> GraphServ
MemExt --> MemServ
style MCP fill:#2196F3
style FastAPI fill:#4CAF50
style Neo4j fill:#f9a825
The system uses a two-port architecture for clear separation of concerns:
- Port 8000 (PRIMARY): MCP SSE Server - AI-to-application communication via Server-Sent Events
- Port 8080 (SECONDARY): Web UI + REST API - Status monitoring and management interface
Both servers run in separate processes using Python's multiprocessing module and share the same service layer.
File: main.py, core/mcp_sse.py
Purpose: PRIMARY service providing AI-to-application communication via MCP protocol over SSE transport
Key Responsibilities:
- MCP protocol implementation (Server-Sent Events transport)
- Tool execution (25+ MCP tools)
- AI client connection management
- Long-running streaming responses
Configuration:
from mcp.server.sse import SseServerTransport
sse_transport = SseServerTransport("/messages/")
mcp_app = Starlette(routes=[
Route("/sse", endpoint=handle_sse, methods=["GET"]),
Mount("/messages/", app=sse_transport.handle_post_message),
])Endpoints:
GET /sse- Establish SSE connectionPOST /messages/*- Receive client messages
Supported Clients:
- Claude Desktop
- Cline (VS Code extension)
- Any MCP-compatible AI client
File: main.py, core/app.py
Purpose: SECONDARY service providing Web UI and REST API for monitoring and management
Key Responsibilities:
- HTTP request handling (REST API)
- Route management
- Middleware processing
- Static file serving (React SPA)
- API documentation (OpenAPI/Swagger)
Configuration:
app = FastAPI(
title="Code Graph Knowledge Service",
version="1.0.0",
lifespan=lifespan, # Startup/shutdown hooks
docs_url="/docs",
redoc_url="/redoc"
)Dependencies:
- All service layer components
- Configuration settings
- Middleware stack
- Exception handlers
Startup Sequence (Both Servers):
- Load configuration from environment (including MCP_PORT=8000, WEB_UI_PORT=8080)
- Initialize logging system
- Initialize all services via lifespan manager
- Fork into two processes:
- Process 1: MCP SSE Server on port 8000
- Process 2: Web UI + REST API on port 8080
- Each process:
- Setup middleware/routes
- Start uvicorn server
- Begin accepting connections
Shutdown Sequence:
- Stop accepting new requests (both servers)
- Stop task queue
- Close Memory Store
- Close Knowledge Service
- Close database connections
- Terminate both processes gracefully
File: core/routes.py, api/*.py
Purpose: Organize and register all API endpoints
Route Modules:
# Health check
GET /api/v1/health
# Knowledge base operations
POST /api/v1/knowledge/query
POST /api/v1/knowledge/search
POST /api/v1/documents/add
POST /api/v1/documents/file
POST /api/v1/documents/directory
# SQL parsing
POST /api/v1/sql/parse
POST /api/v1/sql/schema/upload
# Code graph
POST /api/v1/code/ingest
POST /api/v1/code/search
POST /api/v1/code/related
POST /api/v1/code/impact
POST /api/v1/code/context-pack# Memory management
POST /api/v1/memory/add
POST /api/v1/memory/search
GET /api/v1/memory/{memory_id}
PUT /api/v1/memory/{memory_id}
DELETE /api/v1/memory/{memory_id}
POST /api/v1/memory/supersede
GET /api/v1/memory/project/{project_id}/summary
# Memory extraction (v0.7)
POST /api/v1/memory/extract/conversation
POST /api/v1/memory/extract/commit
POST /api/v1/memory/extract/comments
POST /api/v1/memory/suggest
POST /api/v1/memory/extract/batch# Task management
GET /api/v1/tasks/{task_id}
GET /api/v1/tasks
POST /api/v1/tasks/{task_id}/cancel
GET /api/v1/queue/stats# Server-Sent Events for real-time updates
GET /api/v1/sse/task/{task_id}
GET /api/v1/sse/tasks
GET /api/v1/sse/stats# WebSocket connections
WS /api/v1/ws/task/{task_id}Request/Response Models:
# Example: Document addition
class DocumentAddRequest(BaseModel):
content: str
title: str = "Untitled"
metadata: Optional[Dict[str, Any]] = None
class DocumentAddResponse(BaseModel):
success: bool
document_id: Optional[str] = None
message: str
error: Optional[str] = NoneFile: core/middleware.py
Purpose: Process all requests/responses with cross-cutting concerns
Middleware Components:
CORSMiddleware(
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)Purpose: Handle cross-origin requests for web clients
GZipMiddleware(minimum_size=1000)Purpose: Compress responses for bandwidth optimization
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
logger.info(f"{request.method} {request.url.path} {response.status_code} {duration:.3f}s")
return responsePurpose: Log all HTTP requests with timing information
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"Unhandled exception: {exc}")
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"}
)Purpose: Catch and handle all uncaught exceptions
File: core/lifespan.py
Purpose: Manage application startup and shutdown lifecycle
Initialization Sequence:
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger.info("Starting services...")
# 1. Initialize Neo4j Knowledge Service
await neo4j_knowledge_service.initialize()
# 2. Initialize Memory Store
await memory_store.initialize()
# 3. Initialize Task Processors
processor_registry.initialize_default_processors(neo4j_knowledge_service)
# 4. Start Task Queue
await task_queue.start()
yield
# Shutdown
await task_queue.stop()
await memory_store.close()
await neo4j_knowledge_service.close()Design Pattern: Context manager ensures proper cleanup even on errors
File: services/neo4j_knowledge_service.py
Purpose: Primary service for knowledge graph operations using LlamaIndex
Key Capabilities:
- Document processing and chunking
- Vector embedding generation
- Knowledge graph construction
- RAG-based query answering
- Semantic similarity search
Architecture:
class Neo4jKnowledgeService:
def __init__(self):
self.graph_store = None # Neo4j graph store
self.storage_context = None # Shared storage context
self.knowledge_index = None # KnowledgeGraphIndex
self.vector_index = None # VectorStoreIndex for similarity search
self.response_synthesizer = None # LLM-backed synthesizer
self.query_pipeline = None # Graph/Vector pipeline
self.function_tools = [] # Workflow tools
self.tool_node = None # Optional ToolNode
self._initialized = FalseInitialization Flow:
sequenceDiagram
participant Client
participant KnowServ as Knowledge Service
participant LlamaIndex
participant Neo4j
Client->>KnowServ: initialize()
KnowServ->>KnowServ: _create_llm()
KnowServ->>KnowServ: _create_embed_model()
KnowServ->>Neo4j: Connect via Neo4jGraphStore
Neo4j-->>KnowServ: Connection established
KnowServ->>LlamaIndex: Configure Settings
KnowServ->>LlamaIndex: Create KnowledgeGraphIndex
LlamaIndex-->>KnowServ: Index ready
KnowServ->>KnowServ: Build QueryPipeline (graph + vector + synth)
KnowServ-->>Client: Initialized
Core Methods:
async def add_document(
self,
content: str,
title: str = "Untitled",
metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Add document to knowledge graph"""
# 1. Create LlamaIndex Document
document = Document(text=content, metadata={...})
# 2. Insert into index (creates nodes, embeddings, relationships)
await asyncio.to_thread(self.knowledge_index.insert, document)
# 3. Return result with document ID
return {"success": True, "document_id": doc_id}async def query(
self,
question: str,
*,
mode: str = "hybrid",
use_tools: bool = False
) -> Dict[str, Any]:
"""Run the QueryPipeline composed of graph/vector retrievers and a synthesizer."""
config = self._resolve_pipeline_config(mode, use_tools=use_tools)
result = await asyncio.to_thread(self.query_pipeline.run, question, config)
return {
"success": True,
"answer": str(result["response"]),
"source_nodes": format_sources(result["source_nodes"]),
"pipeline_steps": result["steps"],
"tool_outputs": result["tool_outputs"]
}Pipeline Components:
KnowledgeGraphRAGRetriever— extracts entities and traverses the property graph.VectorIndexRetriever— performs vector similarity search over the Neo4j vector index.ResponseSynthesizer— merges retrieved context and generates the final answer.FunctionTool/ToolNode(optional) — exposes the query as a workflow tool for multi-turn agents.
async def search_similar(
self,
query: str,
top_k: int = 5
) -> Dict[str, Any]:
"""Find similar documents using vector search"""
# 1. Generate query embedding
# 2. Search Neo4j vector index
# 3. Return ranked resultsLLM Provider Support:
def _create_llm(self):
provider = settings.llm_provider
if provider == "ollama":
return Ollama(model=settings.ollama_model, ...)
elif provider == "openai":
return OpenAI(model=settings.openai_model, ...)
elif provider == "gemini":
return Gemini(model=settings.gemini_model, ...)
elif provider == "openrouter":
return OpenRouter(model=settings.openrouter_model, ...)Embedding Model Support:
def _create_embed_model(self):
provider = settings.embedding_provider
if provider == "ollama":
return OllamaEmbedding(model_name=settings.ollama_embedding_model)
elif provider == "openai":
return OpenAIEmbedding(model=settings.openai_embedding_model)
elif provider == "gemini":
return GeminiEmbedding(model_name=settings.gemini_embedding_model)
elif provider == "huggingface":
return HuggingFaceEmbedding(model_name=settings.huggingface_embedding_model)Configuration:
# Global LlamaIndex settings
Settings.llm = self._create_llm()
Settings.embed_model = self._create_embed_model()
Settings.chunk_size = settings.chunk_size
Settings.chunk_overlap = settings.chunk_overlap
Settings.node_parser = SimpleNodeParser.from_defaults()File: services/memory_store.py
Purpose: Persistent project knowledge management for AI agents
Memory Types:
MemoryType = Literal[
"decision", # Architecture choices, tech decisions
"preference", # Coding styles, tool preferences
"experience", # Problems and solutions
"convention", # Team rules, naming conventions
"plan", # Future improvements, TODOs
"note" # Other important information
]Data Model:
class Memory:
id: str # Unique identifier
project_id: str # Project namespace
memory_type: MemoryType # Type of memory
title: str # Short description
content: str # Main content
reason: Optional[str] # Rationale/context
importance: float # 0.0-1.0 score
tags: List[str] # Categorization tags
created_at: datetime # Creation timestamp
updated_at: datetime # Last update
is_active: bool # Soft delete flag
superseded_by: Optional[str] # Replacement memory IDGraph Schema:
// Nodes
(:Memory {
id: string,
project_id: string,
memory_type: string,
title: string,
content: string,
reason: string,
importance: float,
tags: [string],
created_at: datetime,
updated_at: datetime,
is_active: boolean,
superseded_by: string
})
(:Project {
id: string,
name: string,
created_at: datetime
})
// Relationships
(Memory)-[:BELONGS_TO]->(Project)
(Memory)-[:RELATES_TO]->(Memory)
(Memory)-[:SUPERSEDES]->(Memory)Core Operations:
async def add_memory(
self,
project_id: str,
memory_type: MemoryType,
title: str,
content: str,
reason: Optional[str] = None,
importance: float = 0.5,
tags: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Add new project memory"""
# 1. Generate unique ID
# 2. Create Memory node in Neo4j
# 3. Link to Project
# 4. Create fulltext indexes
# 5. Return memory detailsasync def search_memories(
self,
project_id: str,
query: Optional[str] = None,
memory_type: Optional[MemoryType] = None,
tags: Optional[List[str]] = None,
min_importance: float = 0.0,
limit: int = 10
) -> List[Dict[str, Any]]:
"""Search project memories with filters"""
# 1. Build Cypher query with filters
# 2. Use fulltext search if query provided
# 3. Filter by type, tags, importance
# 4. Order by relevance and importance
# 5. Return ranked resultsasync def supersede_memory(
self,
old_memory_id: str,
new_title: str,
new_content: str,
...
) -> Dict[str, Any]:
"""Replace old memory with new version"""
# 1. Create new memory
# 2. Mark old memory as superseded
# 3. Create SUPERSEDES relationship
# 4. Maintain history chainIndexes:
// Constraints
CREATE CONSTRAINT memory_id_unique IF NOT EXISTS
FOR (m:Memory) REQUIRE m.id IS UNIQUE;
// Fulltext search
CREATE FULLTEXT INDEX memory_search IF NOT EXISTS
FOR (m:Memory) ON EACH [m.title, m.content, m.reason, m.tags];File: services/graph_service.py
Purpose: Direct Neo4j graph database operations
Key Capabilities:
- Raw Cypher query execution
- Code graph management
- Schema operations
- Batch operations
- Transaction management
Architecture:
class Neo4jGraphService:
def __init__(self):
self.driver = None # Neo4j driver
self._connected = FalseCore Methods:
async def execute_query(
self,
cypher: str,
parameters: Optional[Dict[str, Any]] = None
) -> GraphQueryResult:
"""Execute Cypher query and return results"""
with self.driver.session(database=settings.neo4j_database) as session:
result = session.run(cypher, parameters)
return self._process_result(result)def create_node(
self,
labels: List[str],
properties: Dict[str, Any]
) -> GraphNode:
"""Create graph node"""
cypher = f"""
CREATE (n:{':'.join(labels)})
SET n = $properties
RETURN n
"""
# Execute and return nodedef create_relationship(
self,
start_node_id: str,
end_node_id: str,
relationship_type: str,
properties: Optional[Dict[str, Any]] = None
) -> GraphRelationship:
"""Create relationship between nodes"""
cypher = """
MATCH (a), (b)
WHERE a.id = $start_id AND b.id = $end_id
CREATE (a)-[r:$rel_type]->(b)
SET r = $properties
RETURN r
"""
# Execute and return relationshipCode Graph Schema:
// Repository structure
(:Repo {id: string, name: string, path: string})
(:File {repoId: string, path: string, lang: string, content: string})
(:Symbol {id: string, name: string, type: string, line: int})
// Code entities
(:Function {id: string, name: string, params: [string], returns: string})
(:Class {id: string, name: string, methods: [string]})
(:Table {id: string, name: string, columns: [string]})
// Relationships
(File)-[:BELONGS_TO]->(Repo)
(Symbol)-[:DEFINED_IN]->(File)
(Symbol)-[:CALLS]->(Symbol)
(Symbol)-[:INHERITS]->(Symbol)
(Symbol)-[:USES]->(Symbol)File: services/task_queue.py
Purpose: Asynchronous background task processing with persistence
Design Pattern: Producer-Consumer with SQLite persistence
Architecture:
class TaskQueue:
def __init__(self, max_concurrent_tasks: int = 3):
self.max_concurrent_tasks = max_concurrent_tasks
self.tasks: Dict[str, TaskResult] = {} # In-memory cache
self.running_tasks: Dict[str, asyncio.Task] = {} # Active tasks
self.task_semaphore = asyncio.Semaphore(max_concurrent_tasks)
self._storage = None # SQLite storage
self._worker_id = str(uuid.uuid4()) # Worker identityTask Lifecycle:
stateDiagram-v2
[*] --> PENDING: Task created
PENDING --> PROCESSING: Worker picks up
PROCESSING --> SUCCESS: Completed
PROCESSING --> FAILED: Error occurred
PROCESSING --> CANCELLED: User cancelled
SUCCESS --> [*]
FAILED --> [*]
CANCELLED --> [*]
note right of PROCESSING
Progress updates
sent via SSE/WebSocket
end note
Task Status:
class TaskStatus(Enum):
PENDING = "pending" # Queued, not started
PROCESSING = "processing" # Currently running
SUCCESS = "success" # Completed successfully
FAILED = "failed" # Error occurred
CANCELLED = "cancelled" # User cancelledTask Result:
@dataclass
class TaskResult:
task_id: str
status: TaskStatus
progress: float = 0.0 # 0.0 to 1.0
message: str = "" # Status message
result: Optional[Dict[str, Any]] = None # Final result
error: Optional[str] = None # Error details
created_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
metadata: Dict[str, Any] # Task-specific dataCore Operations:
async def submit_task(
self,
task_func: Callable,
*args,
task_type: str = "generic",
**kwargs
) -> str:
"""Submit new task for processing"""
# 1. Generate task ID
task_id = str(uuid.uuid4())
# 2. Create TaskResult
task_result = TaskResult(
task_id=task_id,
status=TaskStatus.PENDING,
metadata={"type": task_type}
)
# 3. Store in SQLite
await self._storage.store_task(task_result)
# 4. Cache in memory
self.tasks[task_id] = task_result
# 5. Worker will pick up automatically
return task_idasync def _process_pending_tasks(self):
"""Background worker to process pending tasks"""
while True:
try:
# 1. Get pending tasks from SQLite
pending = await self._storage.get_pending_tasks(limit=10)
# 2. Process each task
for task in pending:
if len(self.running_tasks) < self.max_concurrent_tasks:
await self._execute_task(task)
# 3. Wait before next poll
await asyncio.sleep(1)
except asyncio.CancelledError:
breakasync def _execute_task(self, task: TaskResult):
"""Execute single task with error handling"""
async with self.task_semaphore:
try:
# 1. Update status to PROCESSING
task.status = TaskStatus.PROCESSING
task.started_at = datetime.now()
await self._storage.update_task_status(task.task_id, task.status)
# 2. Get processor for task type
processor = processor_registry.get_processor(task.metadata["type"])
# 3. Execute processor
result = await processor.process(task)
# 4. Update status to SUCCESS
task.status = TaskStatus.SUCCESS
task.result = result
task.completed_at = datetime.now()
except Exception as e:
# Update status to FAILED
task.status = TaskStatus.FAILED
task.error = str(e)
finally:
# Save to storage
await self._storage.update_task(task)Progress Tracking:
async def update_progress(
self,
task_id: str,
progress: float,
message: str
):
"""Update task progress"""
task = self.tasks.get(task_id)
if task:
task.progress = progress
task.message = message
await self._storage.update_task(task)
# Notify SSE/WebSocket listeners
await self._notify_listeners(task_id, task)File: services/code_ingestor.py
Purpose: Parse and ingest code repositories into graph structure
Supported Languages:
- Python
- JavaScript/TypeScript
- Java
- Go
- C/C++
- SQL
Ingestion Process:
sequenceDiagram
participant Client
participant Ingestor
participant Parser
participant GraphService
participant Neo4j
Client->>Ingestor: ingest_repository(path)
Ingestor->>Ingestor: Scan directory
loop For each file
Ingestor->>Parser: parse_file(file, language)
Parser-->>Ingestor: AST + Symbols
Ingestor->>GraphService: create_file_node()
Ingestor->>GraphService: create_symbol_nodes()
Ingestor->>GraphService: create_relationships()
GraphService->>Neo4j: Cypher queries
end
Ingestor-->>Client: Ingestion complete
Core Methods:
async def ingest_repository(
self,
repo_path: str,
repo_name: Optional[str] = None
) -> Dict[str, Any]:
"""Ingest entire code repository"""
# 1. Create Repo node
# 2. Walk directory tree
# 3. Parse each file
# 4. Create graph structure
# 5. Return statisticsdef parse_file(self, file_path: str, language: str) -> ParseResult:
"""Parse code file and extract symbols"""
if language == "python":
return self._parse_python(file_path)
elif language == "javascript":
return self._parse_javascript(file_path)
# ... other languagesdef _parse_python(self, file_path: str) -> ParseResult:
"""Parse Python file using AST"""
import ast
with open(file_path) as f:
tree = ast.parse(f.read())
symbols = []
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
symbols.append({
"type": "function",
"name": node.name,
"line": node.lineno
})
elif isinstance(node, ast.ClassDef):
symbols.append({
"type": "class",
"name": node.name,
"line": node.lineno
})
return ParseResult(symbols=symbols, relationships=[])File: services/memory_extractor.py
Purpose: Automatically extract memories from various sources (v0.7)
Extraction Sources:
- Conversation analysis
- Git commit mining
- Code comment extraction
- Query/answer analysis
- Batch repository analysis
Core Methods:
async def extract_from_conversation(
self,
project_id: str,
conversation: List[Dict[str, str]],
auto_save: bool = False
) -> List[Dict[str, Any]]:
"""Extract memories from AI conversation"""
# 1. Format conversation for LLM
# 2. Use LLM to identify decisions, learnings
# 3. Generate memory objects
# 4. Optionally auto-save high-confidence memoriesasync def extract_from_git_commit(
self,
project_id: str,
commit_sha: str,
commit_message: str,
changed_files: List[str],
auto_save: bool = False
) -> List[Dict[str, Any]]:
"""Extract memories from git commit"""
# 1. Analyze commit message
# 2. Analyze changed files
# 3. Use LLM to extract decisions/experiences
# 4. Generate memories with contextasync def extract_from_code_comments(
self,
project_id: str,
file_path: str
) -> List[Dict[str, Any]]:
"""Mine TODO, FIXME, NOTE markers"""
# 1. Parse file for comment markers
# 2. Extract context around markers
# 3. Classify as plan/note/experience
# 4. Generate memory objectsFile: services/task_processors.py
Purpose: Implement specific task processing logic
Processor Registry:
class ProcessorRegistry:
def __init__(self):
self.processors: Dict[str, TaskProcessor] = {}
def register(self, task_type: str, processor: TaskProcessor):
"""Register processor for task type"""
self.processors[task_type] = processor
def get_processor(self, task_type: str) -> TaskProcessor:
"""Get processor for task type"""
return self.processors.get(task_type)Built-in Processors:
class DocumentProcessor(TaskProcessor):
async def process(self, task: TaskResult) -> Dict[str, Any]:
"""Process document ingestion task"""
# 1. Read document from file/content
# 2. Call knowledge service
# 3. Update progress
# 4. Return resultclass DirectoryProcessor(TaskProcessor):
async def process(self, task: TaskResult) -> Dict[str, Any]:
"""Process batch directory ingestion"""
# 1. List files in directory
# 2. Filter by patterns
# 3. Process each file
# 4. Update progress incrementally
# 5. Return summaryclass CodeIngestionProcessor(TaskProcessor):
async def process(self, task: TaskResult) -> Dict[str, Any]:
"""Process code repository ingestion"""
# 1. Call code ingestor
# 2. Track progress per file
# 3. Return ingestion statisticsPurpose: Primary storage for all graph data
Node Types:
// Knowledge graph
:Document, :Entity, :Chunk
// Memory store
:Memory, :Project
// Code graph
:Repo, :File, :Symbol, :Function, :Class, :Table
// SQL schema
:Database, :Table, :ColumnIndexes:
// Constraints
CREATE CONSTRAINT FOR (d:Document) REQUIRE d.id IS UNIQUE;
CREATE CONSTRAINT FOR (m:Memory) REQUIRE m.id IS UNIQUE;
CREATE CONSTRAINT FOR (r:Repo) REQUIRE r.id IS UNIQUE;
// Fulltext indexes
CREATE FULLTEXT INDEX memory_search FOR (m:Memory)
ON EACH [m.title, m.content, m.reason, m.tags];
CREATE FULLTEXT INDEX file_text FOR (f:File)
ON EACH [f.path, f.lang];
// Vector index
CALL db.index.vector.createNodeIndex(
'knowledge_vectors',
'Document',
'embedding',
1536,
'cosine'
);File: services/task_storage.py
Purpose: Persistent storage for task queue
Schema:
CREATE TABLE tasks (
task_id TEXT PRIMARY KEY,
status TEXT NOT NULL,
task_type TEXT NOT NULL,
progress REAL DEFAULT 0.0,
message TEXT,
result TEXT, -- JSON
error TEXT,
metadata TEXT, -- JSON
created_at TEXT NOT NULL,
started_at TEXT,
completed_at TEXT,
worker_id TEXT,
locked_at TEXT
);
CREATE INDEX idx_status ON tasks(status);
CREATE INDEX idx_created ON tasks(created_at);
CREATE INDEX idx_worker ON tasks(worker_id);Concurrency Control:
async def get_pending_tasks(self, limit: int = 10) -> List[TaskResult]:
"""Get and lock pending tasks"""
# Use SELECT ... FOR UPDATE to prevent race conditions
query = """
UPDATE tasks
SET worker_id = ?, locked_at = ?
WHERE task_id IN (
SELECT task_id FROM tasks
WHERE status = 'pending'
AND (locked_at IS NULL OR locked_at < datetime('now', '-5 minutes'))
ORDER BY created_at
LIMIT ?
)
RETURNING *
"""File: services/sql_parser.py
Purpose: Parse SQL queries and extract metadata
Capabilities:
- SQL syntax parsing
- Table/column extraction
- Query type detection
- Dependency analysis
File: services/ranker.py
Purpose: Rank search results by relevance
Ranking Factors:
- Vector similarity score
- Graph distance
- Metadata match
- Recency
File: services/pack_builder.py
Purpose: Generate context packages for AI tools
Output Format:
{
"files": [
{"path": "src/main.py", "content": "...", "relevance": 0.95},
{"path": "src/utils.py", "content": "...", "relevance": 0.87}
],
"symbols": [
{"name": "process_data", "type": "function", "file": "src/main.py"}
],
"relationships": [
{"from": "main.py", "to": "utils.py", "type": "imports"}
],
"metadata": {
"total_files": 2,
"total_lines": 450,
"languages": ["python"]
}
}File: services/git_utils.py
Purpose: Git repository operations
Capabilities:
- Commit history retrieval
- Diff extraction
- Branch operations
- File change tracking
File: mcp_server.py
Purpose: Model Context Protocol server using official SDK (transport-agnostic)
Architecture:
# Official MCP SDK
from mcp.server import Server
from mcp.server.models import InitializationOptions
app = Server("code-graph-knowledge")
# Tool registration
@app.list_tools()
async def list_tools() -> list[Tool]:
return get_tool_definitions()
# Tool execution
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> Sequence[TextContent]:
# Route to appropriate handler
handler = tool_handlers.get(name)
result = await handler(arguments)
return [TextContent(type="text", text=format_result(result))]Transport Layer (NEW in v0.8):
The MCP server core is transport-agnostic and can work with different transports:
File: core/mcp_sse.py
Purpose: Server-Sent Events transport for network-accessible MCP service
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route, Mount
# Create SSE transport
sse_transport = SseServerTransport("/messages/")
async def handle_sse(request: Request) -> Response:
"""Handle SSE connection from MCP clients"""
async with sse_transport.connect_sse(
request.scope, request.receive, request._send
) as streams:
await mcp_server.run(
streams[0], streams[1],
mcp_server.create_initialization_options()
)
return Response()
# Starlette app for MCP SSE
mcp_app = Starlette(routes=[
Route("/sse", endpoint=handle_sse, methods=["GET"]),
Mount("/messages/", app=sse_transport.handle_post_message),
])Endpoints (Port 8000):
GET /sse- Establish SSE connectionPOST /messages/*- Receive client messages
Use Cases:
- Docker deployments
- Remote MCP access
- Claude Desktop (remote mode)
- Cline (VS Code extension)
File: main.py (stdio mode)
Purpose: Standard input/output transport for local development
from mcp.server.stdio import stdio_server
async def run_stdio():
"""Run MCP server with stdio transport"""
async with stdio_server() as (read_stream, write_stream):
await mcp_server.run(
read_stream, write_stream,
mcp_server.create_initialization_options()
)Use Cases:
- Local development
- Claude Desktop (local mode)
- Direct process communication
Key Design: MCP tools (25+) are transport-independent and work with both SSE and stdio without modification.
Tool Categories (30 tools total):
query_knowledge: RAG-based Q&Asearch_similar_nodes: Vector similarity searchadd_document: Add document from contentadd_file: Add document from fileadd_directory: Batch directory processing
code_graph_ingest_repo: Ingest repositorycode_graph_related: Find related codecode_graph_impact: Impact analysiscontext_pack: Generate AI context
add_memory: Create memorysearch_memories: Search with filtersget_memory: Get by IDupdate_memory: Modify memorydelete_memory: Soft deletesupersede_memory: Replace with new versionget_project_summary: Project overview
extract_from_conversation: Analyze conversationsextract_from_git_commit: Mine commitsextract_from_code_comments: Extract from codesuggest_memory_from_query: Suggest from Q&Abatch_extract_from_repository: Batch analysis
get_task_status: Check task statuswatch_task: Monitor single taskwatch_tasks: Monitor multiple taskslist_tasks: List all taskscancel_task: Cancel taskget_queue_stats: Queue statistics
get_graph_schema: Neo4j schemaget_statistics: System statsclear_knowledge_base: Clear data
File: mcp_tools/*.py
Modular Organization:
mcp_tools/
├── __init__.py # Exports
├── tool_definitions.py # Tool schemas
├── knowledge_handlers.py # Knowledge operations
├── code_handlers.py # Code graph operations
├── memory_handlers.py # Memory operations
├── task_handlers.py # Task operations
├── system_handlers.py # System operations
├── resources.py # MCP resources
├── prompts.py # MCP prompts
└── utils.py # Shared utilities
Handler Pattern:
async def handle_query_knowledge(arguments: dict) -> dict:
"""Handle knowledge query request"""
# 1. Validate arguments
question = arguments.get("question")
if not question:
return {"success": False, "error": "Question required"}
# 2. Call service
result = await neo4j_knowledge_service.query(
question=question,
top_k=arguments.get("top_k", 5)
)
# 3. Return result
return resultgraph TB
subgraph "API Layer"
FastAPI
MCPServer[MCP Server]
end
subgraph "Service Layer"
KnowServ[Knowledge Service]
MemServ[Memory Store]
GraphServ[Graph Service]
TaskQ[Task Queue]
CodeIng[Code Ingestor]
end
subgraph "External"
Neo4j
LLM[LLM Providers]
end
FastAPI --> KnowServ
FastAPI --> MemServ
FastAPI --> GraphServ
FastAPI --> TaskQ
MCPServer --> KnowServ
MCPServer --> MemServ
MCPServer --> GraphServ
MCPServer --> TaskQ
KnowServ --> Neo4j
KnowServ --> LLM
MemServ --> Neo4j
GraphServ --> Neo4j
CodeIng --> GraphServ
TaskQ --> KnowServ
TaskQ --> CodeIng
Critical for avoiding circular dependencies:
# 1. Configuration (no dependencies)
from src.codebase_rag.config import settings
# 2. Storage layer (no app dependencies)
neo4j_connection = Neo4jGraphStore(...)
# 3. Service layer (depends on storage)
knowledge_service = Neo4jKnowledgeService()
memory_store = MemoryStore()
graph_service = Neo4jGraphService()
# 4. Processors (depend on services)
processor_registry.initialize_default_processors(knowledge_service)
# 5. Task queue (depends on processors)
await task_queue.start()
# 6. API layer (depends on all services)
app = create_app()1. Direct Method Calls (within same process):
# FastAPI route calls service
result = await knowledge_service.query(question)2. Task Queue (async operations):
# Submit task for background processing
task_id = await task_queue.submit_task(
task_func=process_large_document,
document_path=path
)3. Event Streaming (real-time updates):
# SSE for task progress
async def task_progress_stream(task_id: str):
while True:
task = task_queue.get_task(task_id)
yield f"data: {json.dumps(task.to_dict())}\n\n"
await asyncio.sleep(1)All components are configured via environment variables:
# config.py
class Settings(BaseSettings):
# Database
neo4j_uri: str = "bolt://localhost:7687"
neo4j_username: str = "neo4j"
neo4j_password: str = "password"
# LLM
llm_provider: str = "ollama"
ollama_model: str = "llama2"
# Timeouts
connection_timeout: int = 30
operation_timeout: int = 120
class Config:
env_file = ".env"Components access configuration:
from src.codebase_rag.config import settings
# Use in service
self.timeout = settings.operation_timeoutEach component has corresponding tests:
tests/
├── test_neo4j_knowledge_service.py
├── test_memory_store.py
├── test_graph_service.py
├── test_task_queue.py
├── test_code_ingestor.py
└── test_mcp_handlers.py
Test Patterns:
@pytest.mark.asyncio
async def test_add_memory():
# Setup
memory_store = MemoryStore()
await memory_store.initialize()
# Execute
result = await memory_store.add_memory(
project_id="test",
memory_type="decision",
title="Test decision",
content="Test content"
)
# Assert
assert result["success"] == True
assert "memory_id" in result
# Cleanup
await memory_store.close()The component architecture follows these principles:
- Single Responsibility: Each component has one clear purpose
- Loose Coupling: Components communicate via interfaces
- High Cohesion: Related functionality grouped together
- Dependency Injection: Services injected rather than created
- Async-First: All I/O operations are asynchronous
- Testability: Components designed for easy testing
This modular design enables:
- Independent development and testing
- Easy component replacement
- Clear debugging and troubleshooting
- Scalable architecture evolution