diff --git a/README.md b/README.md index 6de70c8..de2bdea 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,43 @@ This ensures that your application always has the required models available and ### Project Management Endpoints - `POST /generate-project-status-report` - Generate project status report from OpenProject data +### Evaluate Projects Similarities + +**POST** `/evaluate-projects-similarities` + +**Request Body:** +```json +{ + "project": { + "id": "YOUR_PORTFOLIO_PROJECT_ID", + "type": "portfolio" # should be portfolio + }, + "openproject": { + "base_url": "https://your-openproject-instance.com", + "user_token": "YOUR_OPENPROJECT_API_KEY" + } +} +``` + +**Response Example:** +```json +{ + "portfolio": "Portfolio Project Name", + "candidates": [ + { + "name": "Candidate Project 1", + "score": 85.0, + "project_id": "123", + "reason": "Strong alignment with portfolio goals." + } + ], + "text": "...LLM explanation..." +} +``` + +**Note:** +- The `project` object only requires an `id` field. The `type` field is no longer used. +- The `openproject` object must be provided with each request and contains the OpenProject instance URL and user API token. ## OpenAI API Compatibility diff --git a/config/settings.py b/config/settings.py index 1a151b9..27764a4 100644 --- a/config/settings.py +++ b/config/settings.py @@ -6,27 +6,27 @@ class Settings: """Application settings.""" - + # Logging configuration LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO") LOG_FORMAT: Optional[str] = os.getenv("LOG_FORMAT", None) - + # Ollama configuration OLLAMA_URL: str = os.getenv("OLLAMA_URL", "http://ollama:11434") - OLLAMA_MODEL: str = os.getenv("OLLAMA_MODEL", "mistral:latest") - + OLLAMA_MODEL: str = os.getenv("OLLAMA_MODEL", "gemma:2b") + # Model management MODELS_TO_PULL: str = os.getenv("MODELS_TO_PULL", "mistral:latest") REQUIRED_MODELS: list = os.getenv("REQUIRED_MODELS", "mistral:latest").split(",") - + # Generation parameters GENERATION_NUM_PREDICT: int = int(os.getenv("GENERATION_NUM_PREDICT", "1000")) GENERATION_TEMPERATURE: float = float(os.getenv("GENERATION_TEMPERATURE", "0.7")) - + # API configuration API_HOST: str = os.getenv("API_HOST", "0.0.0.0") API_PORT: int = int(os.getenv("API_PORT", "8000")) - + # RAG Configuration DOCUMENTS_PATH: str = os.getenv("DOCUMENTS_PATH", "documents") VECTOR_STORE_PATH: str = os.getenv("VECTOR_STORE_PATH", "vector_store") @@ -36,5 +36,4 @@ class Settings: CHUNK_OVERLAP: int = int(os.getenv("CHUNK_OVERLAP", "100")) MAX_RETRIEVED_DOCS: int = int(os.getenv("MAX_RETRIEVED_DOCS", "5")) - settings = Settings() diff --git a/src/api/routes.py b/src/api/routes.py index d6a018f..f9f2b0c 100644 --- a/src/api/routes.py +++ b/src/api/routes.py @@ -5,7 +5,8 @@ GenerationRequest, GenerationResponse, HealthResponse, ChatCompletionRequest, ChatCompletionResponse, ChatMessage, ChatChoice, Usage, ModelsResponse, ModelInfo, ErrorResponse, ErrorDetail, - ProjectStatusReportRequest, ProjectStatusReportResponse + ProjectStatusReportRequest, ProjectStatusReportResponse, + SuggestRequest, SuggestResponse, ProjectSimilarityRequest ) from src.pipelines.generation import generation_pipeline from src.services.openproject_client import OpenProjectClient, OpenProjectAPIError @@ -25,10 +26,10 @@ def health_check(): @router.post("/generate", response_model=GenerationResponse) def generate_text(request: GenerationRequest): """Generate text from a prompt. - + Args: request: The generation request containing the prompt - + Returns: Generated text response """ @@ -44,10 +45,10 @@ def generate_text(request: GenerationRequest): @router.post("/v1/chat/completions", response_model=ChatCompletionResponse) def create_chat_completion(request: ChatCompletionRequest): """Create a chat completion (OpenAI-compatible endpoint). - + Args: request: Chat completion request with messages and parameters - + Returns: Chat completion response in OpenAI format """ @@ -65,13 +66,13 @@ def create_chat_completion(request: ChatCompletionRequest): } } ) - + # Generate response using the pipeline response_text, usage_info = generation_pipeline.chat_completion(request) - + # Create response in OpenAI format completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}" - + response = ChatCompletionResponse( id=completion_id, model=request.model, @@ -87,9 +88,9 @@ def create_chat_completion(request: ChatCompletionRequest): ], usage=Usage(**usage_info) ) - + return response - + except HTTPException: raise except Exception as e: @@ -111,20 +112,20 @@ def create_chat_completion(request: ChatCompletionRequest): @router.post("/rag/initialize") def initialize_rag_system(): """Initialize the RAG system by loading PMFlex documents. - + Returns: Initialization status and results """ try: from src.pipelines.rag_pipeline import rag_pipeline result = rag_pipeline.initialize() - + return { "status": "success", "message": "RAG system initialization completed", "result": result } - + except Exception as e: logger.error(f"Error initializing RAG system: {str(e)}") raise HTTPException( @@ -142,7 +143,7 @@ def initialize_rag_system(): @router.get("/rag/status") def get_rag_status(): """Get the current status of the RAG system. - + Returns: RAG system status and statistics """ @@ -150,13 +151,13 @@ def get_rag_status(): from src.pipelines.rag_pipeline import rag_pipeline stats = rag_pipeline.get_pipeline_stats() validation = rag_pipeline.validate_setup() - + return { "status": "success", "pipeline_stats": stats, "validation": validation } - + except Exception as e: logger.error(f"Error getting RAG status: {str(e)}") raise HTTPException( @@ -174,20 +175,20 @@ def get_rag_status(): @router.post("/rag/refresh") def refresh_rag_documents(): """Refresh the RAG document index. - + Returns: Refresh operation results """ try: from src.pipelines.rag_pipeline import rag_pipeline result = rag_pipeline.refresh_documents() - + return { "status": "success", "message": "Document refresh completed", "result": result } - + except Exception as e: logger.error(f"Error refreshing RAG documents: {str(e)}") raise HTTPException( @@ -205,25 +206,25 @@ def refresh_rag_documents(): @router.post("/rag/search") def search_rag_documents(query: str, max_results: int = 5): """Search RAG documents for specific information. - + Args: query: Search query string max_results: Maximum number of results to return - + Returns: Search results from RAG system """ try: from src.pipelines.rag_pipeline import rag_pipeline results = rag_pipeline.search_documents(query, max_results) - + return { "status": "success", "query": query, "results": results, "total_results": len(results) } - + except Exception as e: logger.error(f"Error searching RAG documents: {str(e)}") raise HTTPException( @@ -245,10 +246,10 @@ async def generate_project_status_report( request: ProjectStatusReportRequest ): """Generate a project status report from OpenProject work packages. - + Args: request: Project status report request with project info and OpenProject instance info - + Returns: Generated project status report """ @@ -258,7 +259,7 @@ async def generate_project_status_report( project_type = request.project.type base_url = request.openproject.base_url user_token = request.openproject.user_token - + # Validate user token if not user_token: raise HTTPException( @@ -271,22 +272,22 @@ async def generate_project_status_report( } } ) - + # Initialize OpenProject client openproject_client = OpenProjectClient( base_url=base_url, api_key=user_token ) - + logger.info(f"Generating project status report for project {project_id} (type: {project_type})") - + # Fetch work packages from OpenProject try: work_packages = await openproject_client.get_work_packages(str(project_id)) logger.info(f"Fetched {len(work_packages)} work packages") except OpenProjectAPIError as e: logger.error(f"OpenProject API error: {e.message}") - + # Map OpenProject API errors to appropriate HTTP status codes if e.status_code == 401: raise HTTPException(status_code=401, detail={ @@ -328,7 +329,7 @@ async def generate_project_status_report( "code": "openproject_api_error" } }) - + # Generate project status report using LLM try: report_text, analysis = generation_pipeline.generate_project_status_report( @@ -337,9 +338,9 @@ async def generate_project_status_report( openproject_base_url=base_url, work_packages=work_packages ) - + logger.info(f"Successfully generated project status report for project {project_id}") - + return ProjectStatusReportResponse( project_id=project_id, project_type=project_type, @@ -347,7 +348,7 @@ async def generate_project_status_report( work_packages_analyzed=len(work_packages), openproject_base_url=base_url ) - + except Exception as e: logger.error(f"Error generating report: {str(e)}") raise HTTPException( @@ -360,7 +361,7 @@ async def generate_project_status_report( } } ) - + except HTTPException: raise except Exception as e: @@ -380,20 +381,20 @@ async def generate_project_status_report( @router.get("/v1/models", response_model=ModelsResponse) def list_models(): """List available models (OpenAI-compatible endpoint). - + Returns: List of available models in OpenAI format """ try: available_models = generation_pipeline.get_available_models() - + models = [ ModelInfo(id=model_id) for model_id in available_models ] - + return ModelsResponse(data=models) - + except Exception as e: logger.error(f"Error listing models: {str(e)}") raise HTTPException( @@ -413,16 +414,16 @@ def list_models(): @router.get("/v1/models/{model_id}") def get_model(model_id: str): """Get specific model information (OpenAI-compatible endpoint). - + Args: model_id: The model ID to retrieve - + Returns: Model information """ try: available_models = generation_pipeline.get_available_models() - + if model_id not in available_models: raise HTTPException( status_code=404, @@ -435,9 +436,9 @@ def get_model(model_id: str): } } ) - + return ModelInfo(id=model_id) - + except HTTPException: raise except Exception as e: @@ -452,3 +453,18 @@ def get_model(model_id: str): } } ) + +@router.post("/evaluate-projects-similarities", response_model=SuggestResponse) +def suggest_endpoint(request: ProjectSimilarityRequest): + try: + # Use OpenProject info from the request, not from config + openproject_client = OpenProjectClient( + base_url=request.openproject.base_url, + api_key=request.openproject.user_token + ) + from src.pipelines.suggestion import SuggestionPipeline + suggestion_pipeline = SuggestionPipeline(openproject_client) + result = suggestion_pipeline.suggest(str(request.project.id)) + return result + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/src/models/schemas.py b/src/models/schemas.py index c45bda4..c5b9c48 100644 --- a/src/models/schemas.py +++ b/src/models/schemas.py @@ -21,6 +21,21 @@ class HealthResponse(BaseModel): status: str +# --- Suggestion Schemas --- +class CandidateSuggestion(BaseModel): + name: Optional[str] = None + score: Optional[float] = None + project_id: int + reason: str + +class SuggestRequest(BaseModel): + project_id: int + +class SuggestResponse(BaseModel): + portfolio: Optional[str] = None + candidates: List[CandidateSuggestion] + text: str + # OpenAI Chat Completion Compatible Models class ChatMessage(BaseModel): @@ -126,6 +141,9 @@ class WorkPackage(BaseModel): updated_at: str description: Optional[Dict[str, Any]] = None +class ProjectSimilarityRequest(BaseModel): + project: ProjectInfo + openproject: OpenProjectInfo class ProjectStatusReportResponse(BaseModel): """Response model for project status report.""" diff --git a/src/pipelines/suggestion.py b/src/pipelines/suggestion.py new file mode 100644 index 0000000..279657a --- /dev/null +++ b/src/pipelines/suggestion.py @@ -0,0 +1,274 @@ +import logging +from typing import List, Dict, Any, Optional, Union +from dataclasses import dataclass +from config.settings import settings +from src.services.openproject_client import OpenProjectClient, OpenProjectAPIError +from src.pipelines.generation import generation_pipeline +import asyncio +import json +import concurrent.futures +import re + +logger = logging.getLogger(__name__) + +@dataclass +class Candidate: + project_id: Union[str, int] + name: str + score: Optional[float] + reason: str + +class SuggestionPipeline: + """Pipeline for suggesting suitable sub-projects for a portfolio project.""" + def __init__(self, openproject_client: OpenProjectClient): + self.openproject_client = openproject_client + + def suggest(self, project_id: str) -> Dict[str, Any]: + """Main entry point: Suggest suitable sub-projects for a portfolio project.""" + try: + portfolio_project = asyncio.run(self.openproject_client.get_project_info(project_id)) + logger.info(f"Fetched portfolio project: {portfolio_project.get('name', project_id)}") + + if portfolio_project.get("projectType") != "portfolio": + logger.warning("Project is not a portfolio. Returning message.") + return { + "portfolio": portfolio_project.get("name"), + "candidates": [], + "text": "The selected project is not a portfolio project." + } + + sub_projects = self._get_sub_projects_parallel_with_fallback(project_id) + logger.info(f"Fetched {len(sub_projects)} sub-projects for portfolio {project_id}") + + if not portfolio_project or not sub_projects: + logger.warning("No portfolio or sub-project info available. Returning empty candidates.") + return { + "portfolio": portfolio_project.get("name") if portfolio_project else None, + "candidates": [], + "text": "No portfolio or sub-project info available." + } + + candidates, llm_response = self._llm_score_candidates(portfolio_project, sub_projects) + candidate_list = [self._candidate_to_dict(c) for c in candidates] + + return { + "portfolio": portfolio_project.get("name"), + "candidates": candidate_list, + "text": llm_response + } + except OpenProjectAPIError as e: + logger.error(f"OpenProject API error: {e.message}") + raise + except Exception as e: + logger.error(f"Suggestion pipeline error: {e}") + raise + + def _get_sub_projects_parallel_with_fallback(self, project_id: str) -> List[Dict[str, Any]]: + """Recursively traverse all descendants of the portfolio, skipping Teilportfolio but traversing through them to reach programs and projects. Collect all programs and projects under the portfolio, regardless of depth, but exclude any with 'Teilportfolio' in the name from the results (but not from traversal).""" + all_projects = asyncio.run(self.openproject_client.get_all_projects()) + for p in all_projects: + logger.debug(f"Project {p.get('id')}: _links = {p.get('_links')}") + id_to_project = {str(p.get('id')): p for p in all_projects} + # Build parent->children map + parent_to_children = {} + for p in all_projects: + parent_href = p.get('_links', {}).get('parent', {}).get('href') + if parent_href: + parent_id = parent_href.rstrip('/').split('/')[-1] + parent_to_children.setdefault(parent_id, []).append(p) + # Recursive traversal + def collect_candidates(current_id): + candidates = [] + children = parent_to_children.get(str(current_id), []) + for child in children: + name = child.get('name') or '' + project_type = child.get('projectType') + if 'Teilportfolio' in name: + # Skip as candidate, but traverse its children + candidates.extend(collect_candidates(child.get('id'))) + elif project_type in ('program', 'project'): + candidates.append(child) + # Also traverse children in case of nested programs + candidates.extend(collect_candidates(child.get('id'))) + else: + # Traverse children for any other type + candidates.extend(collect_candidates(child.get('id'))) + return candidates + return collect_candidates(project_id) + + def _extract_custom_fields(self, project_info: dict) -> dict: + """Extract custom fields from a project info dict, using only the 'raw' value if present.""" + result = {} + for k, v in project_info.items(): + if k.startswith('customField'): + if isinstance(v, dict) and 'raw' in v: + result[k] = v['raw'] + else: + result[k] = v + return result + + def _get_work_package_names(self, project_id: str) -> List[str]: + """Fetch work package names for a project synchronously.""" + if not project_id: + return [] + try: + work_packages = asyncio.run(self.openproject_client.get_work_packages(str(project_id))) + return [wp.subject for wp in work_packages if hasattr(wp, 'subject')] + except Exception as e: + logger.warning(f"Failed to fetch work packages for project {project_id}: {e}") + return [] + + def _build_suggestion_prompt(self, portfolio_info: dict, sub_projects: List[dict]) -> str: + """Build the LLM prompt with all relevant info.""" + portfolio_custom_fields = self._extract_custom_fields(portfolio_info) + portfolio_wp_names = self._get_work_package_names(str(portfolio_info.get('id', ''))) + prompt = [ + "You are an expert project portfolio advisor.", + "Portfolio project info:", + f"Name: {portfolio_info.get('name')}", + f"Description: {portfolio_info.get('description', {}).get('raw', '')}" + ] + if portfolio_custom_fields: + prompt.append("Custom fields:") + prompt.extend([f" {k}: {v}" for k, v in portfolio_custom_fields.items()]) + if portfolio_wp_names: + prompt.append("Work packages:") + prompt.extend([f" - {wp_name}" for wp_name in portfolio_wp_names]) + prompt.append("\nSub-projects:") + for i, sp in enumerate(sub_projects, 1): + sp_custom_fields = self._extract_custom_fields(sp) + sp_wp_names = self._get_work_package_names(str(sp.get('id', ''))) + prompt.append(f"{i}. Name: {sp.get('name')}") + prompt.append(f" Description: {sp.get('description', {}).get('raw', '')}") + prompt.append(f" ID: {sp.get('id')}") + if sp_custom_fields: + prompt.append(" Custom fields:") + prompt.extend([f" {k}: {v}" for k, v in sp_custom_fields.items()]) + if sp_wp_names: + prompt.append(" Work packages:") + prompt.extend([f" - {wp_name}" for wp_name in sp_wp_names]) + prompt.append( + "For all projects and programs, rate its suitability as a portfolio candidate for the above portfolio " + "on a scale from 0 to 100 (integer), and briefly explain your reasoning (1-2 sentences). " + "Only return projects that have a value above 70. " + "Return a JSON list of objects with fields: project_id, score, reason" + ) + return '\n'.join(prompt) + + def _llm_score_candidates(self, portfolio_project: dict, sub_projects: List[dict]) -> tuple[list[Candidate], str]: + """ + Parse LLM response and return a list of suitable candidate projects. + Returns: (List[Candidate], str) -- candidates and raw LLM response. + """ + prompt = self._build_suggestion_prompt(portfolio_project, sub_projects) + logger.info(f"LLM prompt for suggestion pipeline:\n{prompt}") + llm_response = generation_pipeline.generate(prompt) + raw_candidates = [] + # 1. Try parsing as a JSON array + try: + raw_candidates = json.loads(llm_response) + if isinstance(raw_candidates, dict): + raw_candidates = [raw_candidates] + except Exception: + # 2. Try wrapping in brackets and removing trailing commas + try: + fixed = llm_response.strip() + if not fixed.startswith("["): + fixed = "[" + fixed + if not fixed.endswith("]"): + fixed = fixed + "]" + fixed = re.sub(r",\s*]", "]", fixed) + raw_candidates = json.loads(fixed) + if isinstance(raw_candidates, dict): + raw_candidates = [raw_candidates] + except Exception: + # 3. Use regex to extract all JSON objects + # This regex extracts all top-level JSON objects from the LLM response. + objects = re.findall(r"{[^{}]*}", llm_response, re.DOTALL) + if not isinstance(raw_candidates, list): + raw_candidates = [] + for obj in objects: + try: + raw_candidates.append(json.loads(obj)) + except Exception: + continue + if not raw_candidates: + logger.error(f"Failed to parse LLM response as JSON: {llm_response}") + candidates = self._parse_candidates_from_text(llm_response, sub_projects) + candidates = [c for c in candidates if c.score is not None and c.score > 70] + return candidates, llm_response + + candidates = [self._dict_to_candidate(c, sub_projects) for c in raw_candidates] + candidates = [c for c in candidates if c.score is not None and c.score > 70] + return candidates, llm_response + + def _dict_to_candidate(self, c: dict, sub_projects: List[dict]) -> Candidate: + """Convert a dict (from LLM JSON) to a Candidate object, matching project_id to sub-projects.""" + sp = next((sp for sp in sub_projects if str(sp.get("id")) == str(c.get("id", c.get("project_id")))), None) + project_id_out = str(sp.get("id")) if sp and sp.get("id") is not None else str(c.get("id", c.get("project_id", ""))) + name = str(sp.get("name")) if sp and sp.get("name") is not None else f"ID {project_id_out}" + score = None + score_val = c.get("score") + if score_val is not None: + try: + score_val_str = str(score_val).strip() + if score_val_str and score_val_str.lower() != 'none': + score = float(score_val_str) + except Exception: + score = None + reason = str(c.get("reason", "")) + return Candidate(project_id=project_id_out, name=name, score=score, reason=reason) + + def _parse_candidates_from_text(self, text: str, sub_projects: List[dict]) -> List[Candidate]: + """Attempt to extract candidate info from a text block if LLM did not return JSON.""" + candidates = [] + pattern = re.compile(r"(?P\d+)\.\s*(?P[^\n]+)\n\s*- project_id: (?P\d+)\n\s*- score: (?P[\d\.]+)\n\s*- reason: (?P.+?)(?=\n\d+\.|$)", re.DOTALL) + for match in pattern.finditer(text): + project_id = str(match.group("project_id")) if match.group("project_id") is not None else "" + parsed_name = str(match.group("name")).strip() if match.group("name") is not None else "Unknown" + sp = next((sp for sp in sub_projects if str(sp.get("id")) == project_id), None) + if sp and sp.get("name") is not None: + name = str(sp.get("name")) + project_id_out = str(sp.get("id")) + else: + # Try fuzzy match by name if project_id fails + name = None + project_id_out = project_id + for s in sub_projects: + s_name = str(s.get("name", "")) + if parsed_name.lower() in s_name.lower() or s_name.lower() in parsed_name.lower(): + name = s_name + project_id_out = str(s.get("id", "")) + break + if not name: + name = parsed_name + logger.warning(f"No sub-project found for project_id {project_id}. Using parsed name '{parsed_name}'.") + score_val = match.group("score") + score = None + try: + if isinstance(score_val, (int, float)): + score = float(score_val) + elif isinstance(score_val, str): + score_val_str = score_val.strip() + if score_val_str and score_val_str.lower() != 'none': + score = float(score_val_str) + elif score_val is not None: + score_val_str = str(score_val).strip() + if score_val_str and score_val_str.lower() != 'none': + score = float(score_val_str) + except Exception: + score = None + reason_val = match.group("reason") + reason = str(reason_val).strip() if reason_val is not None else "" + candidates.append(Candidate(project_id=project_id_out, name=name, score=score, reason=reason)) + candidates.sort(key=lambda x: (x.score is not None, x.score), reverse=True) + return candidates + + def _candidate_to_dict(self, c: Candidate) -> dict: + """Convert a Candidate dataclass to a dict for API response. Include name, project_id, score, and reason.""" + return { + "name": c.name, + "score": c.score, + "project_id": c.project_id, + "reason": c.reason + } diff --git a/src/services/openproject_client.py b/src/services/openproject_client.py index 105aac0..e339fbe 100644 --- a/src/services/openproject_client.py +++ b/src/services/openproject_client.py @@ -19,10 +19,10 @@ def __init__(self, message: str, status_code: Optional[int] = None): class OpenProjectClient: """Client for interacting with OpenProject API.""" - + def __init__(self, base_url: str, api_key: str): """Initialize the OpenProject client. - + Args: base_url: Base URL of the OpenProject instance api_key: API key for authentication @@ -38,40 +38,39 @@ def __init__(self, base_url: str, api_key: str): "Accept": "application/hal+json", "Content-Type": "application/json" } - + async def get_work_packages(self, project_id: str) -> List[WorkPackage]: """Fetch all work packages for a specific project. - + Args: project_id: The OpenProject project ID - + Returns: List of WorkPackage objects - + Raises: OpenProjectAPIError: If API request fails """ url = f"{self.base_url}/api/v3/projects/{project_id}/work_packages" - + headers = self.headers.copy() # Use a copy to avoid modifying self.headers try: async with httpx.AsyncClient(timeout=300.0) as client: logger.info(f"Fetching work packages from: {url}") - - response = await client.get(url, headers=self.headers) - + response = await client.get(url, headers=headers) + if response.status_code == 401: raise OpenProjectAPIError( - "Invalid API key or insufficient permissions", + "Invalid API key or insufficient permissions", status_code=401 ) elif response.status_code == 403: raise OpenProjectAPIError( - "Insufficient permissions to access this project", + "Insufficient permissions to access this project", status_code=403 ) elif response.status_code == 404: raise OpenProjectAPIError( - f"Project with ID '{project_id}' not found", + f"Project with ID '{project_id}' not found", status_code=404 ) elif response.status_code != 200: @@ -79,10 +78,10 @@ async def get_work_packages(self, project_id: str) -> List[WorkPackage]: f"OpenProject API returned status {response.status_code}: {response.text}", status_code=response.status_code ) - + data = response.json() work_packages = [] - + # Parse work packages from the response if "_embedded" in data and "elements" in data["_embedded"]: for wp_data in data["_embedded"]["elements"]: @@ -92,10 +91,10 @@ async def get_work_packages(self, project_id: str) -> List[WorkPackage]: except Exception as e: logger.warning(f"Failed to parse work package {wp_data.get('id', 'unknown')}: {e}") continue - + logger.info(f"Successfully fetched {len(work_packages)} work packages") return work_packages - + except httpx.TimeoutException: raise OpenProjectAPIError("Request to OpenProject API timed out", status_code=408) except httpx.ConnectError: @@ -106,13 +105,13 @@ async def get_work_packages(self, project_id: str) -> List[WorkPackage]: if isinstance(e, OpenProjectAPIError): raise raise OpenProjectAPIError(f"Unexpected error: {str(e)}", status_code=500) - + def _parse_work_package(self, wp_data: Dict[str, Any]) -> WorkPackage: """Parse work package data from OpenProject API response. - + Args: wp_data: Raw work package data from API - + Returns: WorkPackage object """ @@ -124,7 +123,7 @@ def _parse_work_package(self, wp_data: Dict[str, Any]) -> WorkPackage: "name": wp_data["status"].get("name"), "href": wp_data["status"].get("href") } - + # Extract priority information priority = None if "priority" in wp_data and wp_data["priority"]: @@ -133,7 +132,7 @@ def _parse_work_package(self, wp_data: Dict[str, Any]) -> WorkPackage: "name": wp_data["priority"].get("name"), "href": wp_data["priority"].get("href") } - + # Extract assignee information assignee = None if "assignee" in wp_data and wp_data["assignee"]: @@ -142,7 +141,7 @@ def _parse_work_package(self, wp_data: Dict[str, Any]) -> WorkPackage: "name": wp_data["assignee"].get("name"), "href": wp_data["assignee"].get("href") } - + # Extract description description = None if "description" in wp_data and wp_data["description"]: @@ -151,9 +150,9 @@ def _parse_work_package(self, wp_data: Dict[str, Any]) -> WorkPackage: "raw": wp_data["description"].get("raw"), "html": wp_data["description"].get("html") } - + return WorkPackage( - id=wp_data["id"], + id=str(wp_data["id"]), subject=wp_data.get("subject", ""), status=status, priority=priority, @@ -164,40 +163,42 @@ def _parse_work_package(self, wp_data: Dict[str, Any]) -> WorkPackage: updated_at=wp_data.get("updatedAt", ""), description=description ) - + async def get_project_info(self, project_id: str) -> Dict[str, Any]: """Fetch basic project information. - + Args: project_id: The OpenProject project ID - + Returns: Project information dictionary - + Raises: OpenProjectAPIError: If API request fails """ url = f"{self.base_url}/api/v3/projects/{project_id}" - + headers = self.headers.copy() # Use a copy to avoid modifying self.headers + logger.info(f"Sending request to {url} with headers: [REDACTED]") + try: async with httpx.AsyncClient(timeout=300.0) as client: logger.info(f"Fetching project info from: {url}") - - response = await client.get(url, headers=self.headers) - + + response = await client.get(url, headers=headers) + if response.status_code == 401: raise OpenProjectAPIError( - "Invalid API key or insufficient permissions", + "Invalid API key or insufficient permissions", status_code=401 ) elif response.status_code == 403: raise OpenProjectAPIError( - "Insufficient permissions to access this project", + "Insufficient permissions to access this project", status_code=403 ) elif response.status_code == 404: raise OpenProjectAPIError( - f"Project with ID '{project_id}' not found", + f"Project with ID '{project_id}' not found", status_code=404 ) elif response.status_code != 200: @@ -205,9 +206,59 @@ async def get_project_info(self, project_id: str) -> Dict[str, Any]: f"OpenProject API returned status {response.status_code}: {response.text}", status_code=response.status_code ) - + return response.json() - + + except httpx.TimeoutException: + raise OpenProjectAPIError("Request to OpenProject API timed out", status_code=408) + except httpx.ConnectError: + raise OpenProjectAPIError("Could not connect to OpenProject API", status_code=503) + except httpx.HTTPError as e: + raise OpenProjectAPIError(f"HTTP error occurred: {str(e)}", status_code=500) + except Exception as e: + if isinstance(e, OpenProjectAPIError): + raise + raise OpenProjectAPIError(f"Unexpected error: {str(e)}", status_code=500) + + async def get_all_projects(self) -> List[Dict[str, Any]]: + """Fetch all projects from OpenProject, handling pagination.""" + url = f"{self.base_url}/api/v3/projects" + headers = self.headers.copy() + projects = [] + offset = 1 + page_size = 100 # Adjust as needed + + try: + while True: + params = {"offset": offset, "pageSize": page_size} + async with httpx.AsyncClient(timeout=300.0) as client: + logger.info(f"Fetching projects from: {url} (offset={offset})") + response = await client.get(url, headers=headers, params=params) + + if response.status_code == 401: + raise OpenProjectAPIError( + "Invalid API key or insufficient permissions", + status_code=401 + ) + elif response.status_code == 403: + raise OpenProjectAPIError( + "Insufficient permissions to access projects", + status_code=403 + ) + elif response.status_code != 200: + raise OpenProjectAPIError( + f"OpenProject API returned status {response.status_code}: {response.text}", + status_code=response.status_code + ) + + data = response.json() + elements = data.get("_embedded", {}).get("elements", []) + projects.extend(elements) + if len(elements) < page_size: + break + offset += page_size + logger.info(f"Successfully fetched {len(projects)} projects.") + return projects except httpx.TimeoutException: raise OpenProjectAPIError("Request to OpenProject API timed out", status_code=408) except httpx.ConnectError: diff --git a/src/templates/report_templates.py b/src/templates/report_templates.py index 9e2c299..58df439 100644 --- a/src/templates/report_templates.py +++ b/src/templates/report_templates.py @@ -8,14 +8,14 @@ class ProjectReportAnalyzer: """Analyzer for work package data to extract insights.""" - + @staticmethod def analyze_work_packages(work_packages: List[WorkPackage]) -> Dict[str, Any]: """Analyze work packages and extract key metrics. - + Args: work_packages: List of work packages to analyze - + Returns: Dictionary containing analysis results """ @@ -29,54 +29,54 @@ def analyze_work_packages(work_packages: List[WorkPackage]) -> Dict[str, Any]: "timeline_insights": {}, "key_metrics": {} } - + # Basic counts total_count = len(work_packages) - + # Status distribution status_distribution = {} for wp in work_packages: status_name = wp.status.get("name", "Unknown") if wp.status else "Unknown" status_distribution[status_name] = status_distribution.get(status_name, 0) + 1 - + # Priority distribution priority_distribution = {} for wp in work_packages: priority_name = wp.priority.get("name", "No Priority") if wp.priority else "No Priority" priority_distribution[priority_name] = priority_distribution.get(priority_name, 0) + 1 - + # Completion statistics completion_ratios = [wp.done_ratio for wp in work_packages if wp.done_ratio is not None] avg_completion = sum(completion_ratios) / len(completion_ratios) if completion_ratios else 0 completed_count = sum(1 for wp in work_packages if wp.done_ratio == 100) in_progress_count = sum(1 for wp in work_packages if wp.done_ratio and 0 < wp.done_ratio < 100) not_started_count = sum(1 for wp in work_packages if not wp.done_ratio or wp.done_ratio == 0) - + completion_stats = { "average_completion": round(avg_completion, 1), "completed": completed_count, "in_progress": in_progress_count, "not_started": not_started_count } - + # Assignee workload assignee_workload = {} for wp in work_packages: assignee_name = wp.assignee.get("name", "Unassigned") if wp.assignee else "Unassigned" if assignee_name not in assignee_workload: assignee_workload[assignee_name] = {"total": 0, "completed": 0, "in_progress": 0} - + assignee_workload[assignee_name]["total"] += 1 if wp.done_ratio == 100: assignee_workload[assignee_name]["completed"] += 1 elif wp.done_ratio and wp.done_ratio > 0: assignee_workload[assignee_name]["in_progress"] += 1 - + # Timeline insights now = datetime.now() overdue_count = 0 upcoming_deadlines = 0 - + for wp in work_packages: if wp.due_date: try: @@ -87,19 +87,19 @@ def analyze_work_packages(work_packages: List[WorkPackage]) -> Dict[str, Any]: upcoming_deadlines += 1 except (ValueError, TypeError): continue - + timeline_insights = { "overdue_items": overdue_count, "upcoming_deadlines_7_days": upcoming_deadlines } - + # Key metrics key_metrics = { "completion_rate": round((completed_count / total_count) * 100, 1) if total_count > 0 else 0, "active_work_ratio": round(((in_progress_count + completed_count) / total_count) * 100, 1) if total_count > 0 else 0, "team_members": len([k for k in assignee_workload.keys() if k != "Unassigned"]) } - + return { "total_count": total_count, "status_distribution": status_distribution, @@ -113,11 +113,11 @@ def analyze_work_packages(work_packages: List[WorkPackage]) -> Dict[str, Any]: class ProjectStatusReportTemplate: """Template for generating project status reports.""" - + @staticmethod def get_default_template() -> str: """Get the default project status report template. - + Returns: Template string for LLM prompt """ @@ -172,55 +172,55 @@ def get_default_template() -> str: Format the report in a professional, clear, and actionable manner. Use bullet points and structured sections for easy readability. Focus on insights that would be valuable for project managers and stakeholders. """ - + @staticmethod def format_work_packages_summary(work_packages: List[WorkPackage], limit: int = 10) -> str: """Format work packages into a summary for the report. - + Args: work_packages: List of work packages limit: Maximum number of work packages to include in detail - + Returns: Formatted string summary """ if not work_packages: return "No work packages found for this project." - + summary_lines = [] - + # Show top work packages (by priority or recent updates) sorted_packages = sorted( - work_packages, + work_packages, key=lambda wp: ( wp.priority.get("id", 0) if wp.priority else 0, wp.updated_at - ), + ), reverse=True ) - + summary_lines.append(f"Top {min(limit, len(work_packages))} Work Packages:") - + for i, wp in enumerate(sorted_packages[:limit], 1): status_name = wp.status.get("name", "Unknown") if wp.status else "Unknown" priority_name = wp.priority.get("name", "Normal") if wp.priority else "Normal" assignee_name = wp.assignee.get("name", "Unassigned") if wp.assignee else "Unassigned" completion = wp.done_ratio if wp.done_ratio is not None else 0 - + summary_lines.append( f"{i}. [{wp.id}] {wp.subject}\n" f" Status: {status_name} | Priority: {priority_name} | " f"Assignee: {assignee_name} | Progress: {completion}%" ) - + if wp.due_date: summary_lines.append(f" Due Date: {wp.due_date}") - + if len(work_packages) > limit: summary_lines.append(f"\n... and {len(work_packages) - limit} more work packages") - + return "\n".join(summary_lines) - + @staticmethod def create_report_prompt( project_id: str, @@ -229,24 +229,24 @@ def create_report_prompt( analysis: Dict[str, Any] ) -> str: """Create the complete prompt for LLM report generation. - + Args: project_id: Project identifier openproject_base_url: Base URL of OpenProject instance work_packages: List of work packages analysis: Analysis results from ProjectReportAnalyzer - + Returns: Complete formatted prompt string """ template = ProjectStatusReportTemplate.get_default_template() - + # Format analysis data as JSON for better structure analysis_json = json.dumps(analysis, indent=2, default=str) - + # Create work packages summary work_packages_summary = ProjectStatusReportTemplate.format_work_packages_summary(work_packages) - + return template.format( project_id=project_id, openproject_base_url=openproject_base_url, @@ -255,7 +255,7 @@ def create_report_prompt( analysis_data=analysis_json, work_packages_summary=work_packages_summary ) - + @staticmethod def create_enhanced_report_prompt( project_id: str, @@ -263,10 +263,10 @@ def create_enhanced_report_prompt( openproject_base_url: str, work_packages: List[WorkPackage], analysis: Dict[str, Any], - pmflex_context: str + pmflex_context: str = "" ) -> str: """Create an enhanced prompt with PMFlex RAG context. - + Args: project_id: Project identifier project_type: Type of project (portfolio, program, project) @@ -274,18 +274,18 @@ def create_enhanced_report_prompt( work_packages: List of work packages analysis: Analysis results from ProjectReportAnalyzer pmflex_context: PMFlex context from RAG system - + Returns: Complete formatted prompt string with RAG enhancement """ template = ProjectStatusReportTemplate.get_enhanced_template() - + # Format analysis data as JSON for better structure analysis_json = json.dumps(analysis, indent=2, default=str) - + # Create work packages summary work_packages_summary = ProjectStatusReportTemplate.format_work_packages_summary(work_packages) - + return template.format( project_id=project_id, project_type=project_type, @@ -296,11 +296,11 @@ def create_enhanced_report_prompt( work_packages_summary=work_packages_summary, pmflex_context=pmflex_context or "No PMFlex context available." ) - + @staticmethod def get_enhanced_template() -> str: """Get the enhanced project status report template with PMFlex context. - + Returns: Template string for LLM prompt with RAG enhancement """ @@ -381,20 +381,20 @@ def get_enhanced_template() -> str: The report should reflect PMFlex principles of transparency, accountability, and systematic project management approach used in German federal administration. Prioritize clarity and actionable information for project stakeholders and governance bodies. """ - + @staticmethod def get_custom_template(template_name: str) -> str: """Get a custom report template by name. - + This method can be extended to support multiple report templates for different use cases or organizations. - + Args: template_name: Name of the template to retrieve - + Returns: Template string - + Raises: ValueError: If template name is not found """ @@ -403,12 +403,12 @@ def get_custom_template(template_name: str) -> str: "executive": ProjectStatusReportTemplate._get_executive_template(), "detailed": ProjectStatusReportTemplate._get_detailed_template() } - + if template_name not in templates: raise ValueError(f"Template '{template_name}' not found. Available templates: {list(templates.keys())}") - + return templates[template_name] - + @staticmethod def _get_executive_template() -> str: """Executive-focused template with high-level insights.""" @@ -429,7 +429,7 @@ def _get_executive_template() -> str: Keep the report concise and focused on decision-making insights. """ - + @staticmethod def _get_detailed_template() -> str: """Detailed template for comprehensive analysis."""