-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathlifespan.py
More file actions
77 lines (58 loc) · 2.29 KB
/
lifespan.py
File metadata and controls
77 lines (58 loc) · 2.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""
Application lifecycle management module
"""
from contextlib import asynccontextmanager
from fastapi import FastAPI
from loguru import logger
from codebase_rag.services.knowledge import neo4j_knowledge_service
from codebase_rag.services.tasks import task_queue, processor_registry
from codebase_rag.services.memory import memory_store
@asynccontextmanager
async def lifespan(app: FastAPI):
"""application lifecycle management"""
logger.info("Starting Code Graph Knowledge Service...")
try:
# initialize services
await initialize_services()
yield
except Exception as e:
logger.error(f"Service initialization failed: {e}")
raise
finally:
# clean up resources
await cleanup_services()
async def initialize_services():
"""initialize all services"""
# initialize Neo4j knowledge graph service
logger.info("Initializing Neo4j Knowledge Service...")
if not await neo4j_knowledge_service.initialize():
logger.error("Failed to initialize Neo4j Knowledge Service")
raise RuntimeError("Neo4j service initialization failed")
logger.info("Neo4j Knowledge Service initialized successfully")
# initialize Memory Store
logger.info("Initializing Memory Store...")
if not await memory_store.initialize():
logger.warning("Memory Store initialization failed - memory features may not work")
else:
logger.info("Memory Store initialized successfully")
# initialize task processors
logger.info("Initializing Task Processors...")
processor_registry.initialize_default_processors(neo4j_knowledge_service)
logger.info("Task Processors initialized successfully")
# initialize task queue
logger.info("Initializing Task Queue...")
await task_queue.start()
logger.info("Task Queue initialized successfully")
async def cleanup_services():
"""clean up all services"""
logger.info("Shutting down services...")
try:
# stop task queue
await task_queue.stop()
# close Memory Store
await memory_store.close()
# close Neo4j service
await neo4j_knowledge_service.close()
logger.info("Services shut down successfully")
except Exception as e:
logger.error(f"Error during shutdown: {e}")