From fa4e1a41e574f9fd3dba8ccec595d10e4e66551f Mon Sep 17 00:00:00 2001 From: Charith Nuwan Bimsara <59943919+nuwangeek@users.noreply.github.com> Date: Tue, 10 Feb 2026 12:19:34 +0530 Subject: [PATCH] Custom prompt configuration feature and Pyright fixes (#297) * updated docker compose ec2 * integrate streaming endpoint with test prodction connection page * formatted response with markdown * fe logic for the encryption * vault secret update after fixing issues * fixed formatting issue * integration with be * update cron manager vault script * tested integration of vault security update * fix security issues * creation success model changes * clean vite config generated files * fixed issue references are not sending with streming tokens * complete #192 and #206 bug fixes * production inference display logic change * change production inference display logic * fixed requested issue * Refactor Docker Compose configuration for vault agents and update CSP settings * Remove obsolete Vite configuration files and associated plugins * prompt coniguration backend to be testing * custom prompt configuration update and fixed Pyright issues * fixed copilot reviews * fixed review comments --------- Co-authored-by: Thiru Dinesh <56014038+Thirunayan22@users.noreply.github.com> Co-authored-by: Thiru Dinesh Co-authored-by: erangi-ar Co-authored-by: erangi-ar <111747955+erangi-ar@users.noreply.github.com> --- .../POST/prompt-configuration/save.yml | 23 +- .../llm-connections/prompts/get-prompt.yml | 34 ++ constants.ini | 1 + docs/CUSTOM_PROMPT_CONFIGURATION.md | 371 ++++++++++++++++ .../contextual_retriever.py | 16 +- src/guardrails/dspy_nemo_adapter.py | 9 +- src/guardrails/nemo_rails_adapter.py | 52 ++- src/llm_orchestration_service.py | 111 ++++- src/llm_orchestration_service_api.py | 128 +++++- src/llm_orchestrator_config/exceptions.py | 6 +- .../llm_ochestrator_constants.py | 6 + src/optimization/metrics/generator_metrics.py | 8 +- src/response_generator/response_generate.py | 34 +- src/utils/prompt_config_loader.py | 414 ++++++++++++++++++ 14 files changed, 1152 insertions(+), 61 deletions(-) create mode 100644 DSL/Ruuter.public/rag-search/POST/llm-connections/prompts/get-prompt.yml create mode 100644 docs/CUSTOM_PROMPT_CONFIGURATION.md create mode 100644 src/utils/prompt_config_loader.py diff --git a/DSL/Ruuter.private/rag-search/POST/prompt-configuration/save.yml b/DSL/Ruuter.private/rag-search/POST/prompt-configuration/save.yml index ad90875d..84b68fdb 100644 --- a/DSL/Ruuter.private/rag-search/POST/prompt-configuration/save.yml +++ b/DSL/Ruuter.private/rag-search/POST/prompt-configuration/save.yml @@ -38,7 +38,7 @@ update_prompt: id: ${existing_prompt.response.body[0].id} prompt: ${prompt} result: update_result - next: return_update_success + next: refresh_llm_cache insert_prompt: call: http.post @@ -47,6 +47,25 @@ insert_prompt: body: prompt: ${prompt} result: insert_result + next: refresh_llm_cache + +refresh_llm_cache: + call: http.post + args: + url: "[#RAG_SEARCH_PROMPT_REFRESH]" + body: {} + result: refresh_result + next: check_operation_type + on_error: handle_refresh_error + +handle_refresh_error: + log: "Prompt refresh failed, will use TTL cache fallback" + next: check_operation_type + +check_operation_type: + switch: + - condition: "${update_result != null}" + next: return_update_success next: return_insert_success return_update_success: @@ -55,4 +74,4 @@ return_update_success: return_insert_success: return: ${insert_result.response.body[0]} - next: end + next: end \ No newline at end of file diff --git a/DSL/Ruuter.public/rag-search/POST/llm-connections/prompts/get-prompt.yml b/DSL/Ruuter.public/rag-search/POST/llm-connections/prompts/get-prompt.yml new file mode 100644 index 00000000..125aa7ff --- /dev/null +++ b/DSL/Ruuter.public/rag-search/POST/llm-connections/prompts/get-prompt.yml @@ -0,0 +1,34 @@ +declaration: + call: declare + version: 0.1 + description: "Get custom prompt configuration from database" + method: post + accepts: json + returns: json + namespace: rag-search + +get_prompt_configuration: + call: http.get + args: + url: "[#RAG_SEARCH_RESQL]/get-prompt-configuration" + result: prompt_result + next: check_prompt_exists + +check_prompt_exists: + switch: + - condition: "${prompt_result.response.body.length > 0}" + next: return_result + next: return_empty + +return_result: + return: ${prompt_result.response.body[0]} + next: end + +return_empty: + assign: + emptyData: {} + next: return_empty_response + +return_empty_response: + return: ${emptyData} + next: end diff --git a/constants.ini b/constants.ini index bc09e038..63172d15 100644 --- a/constants.ini +++ b/constants.ini @@ -7,5 +7,6 @@ RAG_SEARCH_PROJECT_LAYER=rag-search RAG_SEARCH_TIM=http://tim:8085 RAG_SEARCH_CRON_MANAGER=http://cron-manager:9010 RAG_SEARCH_LLM_ORCHESTRATOR=http://llm-orchestration-service:8100/orchestrate +RAG_SEARCH_PROMPT_REFRESH=http://llm-orchestration-service:8100/prompt-config/refresh DOMAIN=localhost DB_PASSWORD=dbadmin \ No newline at end of file diff --git a/docs/CUSTOM_PROMPT_CONFIGURATION.md b/docs/CUSTOM_PROMPT_CONFIGURATION.md new file mode 100644 index 00000000..8a7f94ef --- /dev/null +++ b/docs/CUSTOM_PROMPT_CONFIGURATION.md @@ -0,0 +1,371 @@ +# Custom Prompt Configuration Flow + +## Overview + +The custom prompt configuration system allows admins to configure prompts via UI that automatically apply to all response generation operations. Changes are cached with a 5-minute TTL and can be immediately refreshed when updated. + +--- + +## Architecture Components + +### 1. **Database Layer** +- **Table**: `public.prompt_configuration` +- **Columns**: `id` (BIGINT), `prompt` (TEXT) +- Stores the custom prompt text configured by admins + +### 2. **Ruuter DSL Endpoints** +- **Get Prompt**: `DSL/Ruuter.public/rag-search/POST/llm-connections/prompts/get-prompt.yml` + - Fetches prompt from database via Resql + - Returns prompt data or empty object + +- **Save Prompt**: `DSL/Ruuter.private/rag-search/POST/prompt-configuration/save.yml` + - Updates/inserts prompt in database + - Automatically triggers cache refresh after save + +### 3. **Python Components** +- **PromptConfigurationLoader** (`src/utils/prompt_config_loader.py`) + - HTTP client to fetch prompts via Ruuter + - 5-minute TTL cache with thread safety + - Retry logic (3 attempts, exponential backoff) + - Force refresh capability + +- **LLMOrchestrationService** (`src/llm_orchestration_service.py`) + - Initializes loader at startup + - Formats custom instructions with wrapper tags + - Passes to ResponseGeneratorAgent + +- **ResponseGeneratorAgent** (`src/response_generator/response_generate.py`) + - Accepts `custom_instructions_prefix` parameter + - Prepends custom instructions to user questions + - Applied in both streaming and non-streaming modes + +### 4. **API Endpoints** +- **`POST /orchestrate`** - Standard request flow +- **`POST /orchestrate/test`** - Test request flow +- **`POST /orchestrate/stream`** - Streaming request flow +- **`POST /prompt-config/refresh`** - Force cache refresh + +--- + +## Flow Diagram + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ ADMIN UPDATES PROMPT IN UI │ +└────────────────┬────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Ruuter: save.yml │ +│ 1. Update/Insert in PostgreSQL │ +│ 2. Call POST /prompt-config/refresh │ +└────────────────┬────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ FastAPI: /prompt-config/refresh │ +│ - PromptConfigurationLoader.force_refresh() │ +│ - Invalidates cache immediately │ +└────────────────┬────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Cache Updated - Ready for Next Request │ +└─────────────────────────────────────────────────────────────────┘ + +╔═════════════════════════════════════════════════════════════════╗ +║ USER SENDS MESSAGE ║ +╚════════════════┬════════════════════════════════════════════════╝ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ FastAPI: /orchestrate, /orchestrate/test, or /orchestrate/stream│ +└────────────────┬────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ LLMOrchestrationService.process_orchestration_request() │ +│ or stream_orchestration_response() │ +└────────────────┬────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ _initialize_service_components() │ +│ ↓ │ +│ _safe_initialize_response_generator() │ +│ ↓ │ +│ _initialize_response_generator() │ +└────────────────┬────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ _get_custom_instructions_for_response_generation() │ +│ ↓ │ +│ prompt_config_loader.get_custom_instructions() │ +│ - Returns from cache if valid (< 5 min old) │ +│ - OR fetches via Ruuter if expired │ +└────────────────┬────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Format custom instructions: │ +│ "[SYSTEM INSTRUCTIONS]\n{prompt}\n\n[USER QUESTION]\n" │ +└────────────────┬────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ ResponseGeneratorAgent(custom_instructions_prefix=prefix) │ +└────────────────┬────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ ResponseGeneratorAgent.forward() or stream_response() │ +│ - Prepends custom_instructions_prefix to user question │ +│ - Modified question = "{prefix}{user_question}" │ +└────────────────┬────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ DSPy Predictor receives modified question │ +│ - Custom instructions guide response generation │ +│ - LLM follows configured rules (language, tone, format, etc.) │ +└────────────────┬────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Response returned to user │ +│ - Follows custom prompt configuration │ +│ - Language policy applied │ +│ - Formatting rules applied │ +│ - Safety guidelines applied │ +└─────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Detailed Step-by-Step Flow + +### **Startup Phase** +1. **Service Initialization** (`LLMOrchestrationService.__init__`) + - Creates `PromptConfigurationLoader` instance + - Warms up cache by calling `get_custom_instructions()` + - Logs success: "Custom prompt configuration loaded at startup (X chars)" + - Logs if not found: "ℹNo custom prompt configuration found - using defaults" + +### **Admin Updates Prompt** +1. **UI Save Action** + - Admin edits prompt text in UI + - Submits save request + +2. **Ruuter Processing** (`save.yml`) + - Checks if prompt exists in database + - Updates existing or inserts new prompt + - Calls `POST /prompt-config/refresh` endpoint + +3. **Cache Invalidation** (`/prompt-config/refresh`) + - `force_refresh()` clears cache immediately + - Fetches new prompt from Ruuter + - Returns success status with prompt length and content hash (no preview for security) + +### **User Request Processing** +1. **Request Received** (Any of 3 endpoints) + - `/orchestrate` - Standard response + - `/orchestrate/test` - Test response + - `/orchestrate/stream` - Streaming response + +2. **Service Components Initialization** + - LLM Manager initialized + - Contextual Retriever initialized + - **Response Generator initialized** ← Custom prompt applied here + +3. **Custom Instructions Loading** + ```python + custom_prefix = self._get_custom_instructions_for_response_generation() + # Returns: "[SYSTEM INSTRUCTIONS]\n{prompt}\n\n[USER QUESTION]\n" + ``` + +4. **Response Generator Creation** + ```python + ResponseGeneratorAgent(custom_instructions_prefix=custom_prefix) + ``` + +5. **Question Modification** + ```python + # In forward() or stream_response() + modified_question = f"{user_question}{custom_instructions_prefix}" + ``` + +6. **LLM Processing** + - DSPy predictor receives modified question + - Custom instructions guide response behavior + - Response generated following configured rules + +--- + +## Cache Behavior + +### **TTL Cache (5 minutes)** +- **Cache Hit**: Returns immediately from memory (fast) +- **Cache Miss**: Fetches via HTTP from Ruuter (slower, ~100-500ms) +- **Stale Fallback**: If fetch fails, returns last known good value + +### **Force Refresh** +- Triggered by admin save action +- Bypasses cache TTL +- Ensures immediate propagation of changes + +### **Thread Safety** +- Uses `threading.Lock()` for concurrent requests +- Single fetch for multiple simultaneous requests +- Cache shared across all requests + +--- + +## Configuration + +### **Constants** (`src/llm_orchestrator_config/llm_ochestrator_constants.py`) +```python +RUUTER_PROMPT_CONFIG_ENDPOINT = ( + "http://ruuter-public:8086/rag-search/llm-connections/prompts/get-prompt" +) +PROMPT_CONFIG_CACHE_TTL = 300 # 5 minutes cache +``` + +### **Environment Variables** (`constants.ini`) +```ini +RAG_SEARCH_PROMPT_REFRESH=http://llm-orchestration-service:8100/prompt-config/refresh +``` + +--- + +## Testing + +### **1. Insert Test Prompt** +```sql +INSERT INTO public.prompt_configuration (id, prompt) +VALUES (1, 'Always respond in Estonian language. Be professional and concise.') +ON CONFLICT (id) DO UPDATE SET prompt = EXCLUDED.prompt; +``` + +### **2. Test via API** +```bash +curl -X POST http://localhost:8100/orchestrate/test \ + -H "Content-Type: application/json" \ + -d '{ + "message": "What is artificial intelligence?", + "environment": "development", + "connectionId": 1 + }' +``` + +### **3. Update Prompt** +```sql +UPDATE public.prompt_configuration +SET prompt = 'Provide concise answers using bullet points. Be helpful and clear.' +WHERE id = 1; +``` + +### **4. Verify Immediate Refresh** +- Check logs for: "Prompt configuration cache refreshed successfully" +- Test same question - response format should change immediately + +### **5. Check Cache Status** +```bash +# Manual refresh (optional) +curl -X POST http://localhost:8100/prompt-config/refresh +``` + +**Response:** +```json +{ + "refreshed": true, + "message": "Prompt configuration refreshed successfully", + "prompt_length": 245, + "content_hash": "a3f5b8c9e1d2f4a6" +} +``` +**Note:** For security, the endpoint returns only the prompt length and a SHA-256 hash (not the actual prompt content). + +--- + +## Key Features + +✅ **TTL Caching** - 5-minute cache reduces database calls +✅ **Immediate Updates** - Admin changes trigger instant refresh +✅ **Graceful Degradation** - If refresh fails, TTL cache continues working +✅ **Thread-Safe** - Multiple concurrent requests handled safely +✅ **Retry Logic** - 3 attempts with exponential backoff for HTTP failures +✅ **Instruction Prepending** - Preserves DSPy optimization compatibility +✅ **Applied Consistently** - Works across all 3 orchestration endpoints +✅ **Applied to ResponseGenerator Only** - Not applied to PromptRefinerAgent + +--- + +## Example + +**Database Prompt:** +``` +Always respond in Estonian language. Be professional and concise. +When answering, prioritize accuracy and cite sources when available. +``` + +**What DSPy Receives:** +``` +[SYSTEM INSTRUCTIONS] +Always respond in Estonian language. Be professional and concise. +When answering, prioritize accuracy and cite sources when available. + +[USER QUESTION] +What is DigiDoc and how can I use it? + +Context: [retrieved documentation chunks...] +``` + +**Expected Response:** +- In Estonian language ✅ +- Professional tone ✅ +- Concise format ✅ +- Citations included ✅ + +--- + +## Files Modified + +| File | Purpose | +|------|---------| +| `src/utils/prompt_config_loader.py` | HTTP loader with caching and retry | +| `src/llm_orchestration_service.py` | Initialize loader, format instructions | +| `src/llm_orchestration_service_api.py` | Refresh endpoint | +| `src/response_generator/response_generate.py` | Accept and apply custom prefix | +| `DSL/Ruuter.public/rag-search/POST/llm-connections/prompts/get-prompt.yml` | Fetch prompt endpoint | +| `DSL/Ruuter.private/rag-search/POST/prompt-configuration/save.yml` | Save with refresh trigger | +| `src/llm_orchestrator_config/llm_ochestrator_constants.py` | Configuration constants | +| `constants.ini` | Refresh endpoint URL | + +--- + +## Troubleshooting + +### **Prompt Not Applied** +- Check logs for: "Custom prompt configuration loaded at startup" +- Verify database has prompt: `SELECT * FROM public.prompt_configuration;` +- Test refresh endpoint: `curl -X POST http://localhost:8100/prompt-config/refresh` + +### **Cache Not Refreshing** +- Check Ruuter save.yml calls refresh endpoint +- Verify `RAG_SEARCH_PROMPT_REFRESH` constant in constants.ini +- Check logs for refresh success/failure + +### **Empty Prompt** +- Check Ruuter endpoint returns correct format +- Verify response unwrapping logic in loader +- Check logs for "No prompt configuration found in database; caching empty result" + +--- + +## Notes + +- Custom prompts apply **only to ResponseGeneratorAgent** (not PromptRefinerAgent) +- PromptRefiner focuses on query optimization for retrieval +- ResponseGenerator needs language policy and interaction style for user-facing content +- This design preserves DSPy optimization compatibility by using instruction prepending instead of signature modification diff --git a/src/contextual_retrieval/contextual_retriever.py b/src/contextual_retrieval/contextual_retriever.py index 8ab5d242..b6d4699b 100644 --- a/src/contextual_retrieval/contextual_retriever.py +++ b/src/contextual_retrieval/contextual_retriever.py @@ -206,18 +206,20 @@ async def retrieve_contextual_chunks( semantic_task, bm25_task, return_exceptions=True ) - # Handle exceptions and assign results - if isinstance(search_results[0], Exception): - logger.error(f"Semantic search failed: {search_results[0]}") + # Handle exceptions and assign results with proper type narrowing + semantic_result = search_results[0] + if isinstance(semantic_result, BaseException): + logger.error(f"Semantic search failed: {semantic_result}") semantic_results = [] else: - semantic_results = search_results[0] + semantic_results = semantic_result - if isinstance(search_results[1], Exception): - logger.error(f"BM25 search failed: {search_results[1]}") + bm25_result = search_results[1] + if isinstance(bm25_result, BaseException): + logger.error(f"BM25 search failed: {bm25_result}") bm25_results = [] else: - bm25_results = search_results[1] + bm25_results = bm25_result else: # Sequential execution semantic_results = await self._semantic_search( diff --git a/src/guardrails/dspy_nemo_adapter.py b/src/guardrails/dspy_nemo_adapter.py index 630b2657..488d1f47 100644 --- a/src/guardrails/dspy_nemo_adapter.py +++ b/src/guardrails/dspy_nemo_adapter.py @@ -14,6 +14,7 @@ AsyncCallbackManagerForLLMRun, ) from langchain_core.language_models.llms import LLM +from langchain_core.outputs import GenerationChunk from src.guardrails.guardrails_llm_configs import TEMPERATURE, MAX_TOKENS, MODEL_NAME @@ -191,7 +192,7 @@ def _stream( stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, - ) -> Iterator[str]: + ) -> Iterator[GenerationChunk]: """ Synchronous streaming via DSPy's native streaming support. @@ -227,7 +228,7 @@ def _stream( if token: if run_manager: run_manager.on_llm_new_token(token) - yield token + yield GenerationChunk(text=token) except Exception as e: logger.error(f"Error in DSPyNeMoLLM._stream: {str(e)}") @@ -239,7 +240,7 @@ async def _astream( stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, - ) -> AsyncIterator[str]: + ) -> AsyncIterator[GenerationChunk]: """ Async streaming using Threaded Producer / Async Consumer pattern. @@ -316,7 +317,7 @@ def producer(): if token: if run_manager: await run_manager.on_llm_new_token(token) - yield token + yield GenerationChunk(text=token) except Exception as e: logger.error(f"Error in DSPyNeMoLLM._astream: {str(e)}") diff --git a/src/guardrails/nemo_rails_adapter.py b/src/guardrails/nemo_rails_adapter.py index ecbd1b33..1ae38982 100644 --- a/src/guardrails/nemo_rails_adapter.py +++ b/src/guardrails/nemo_rails_adapter.py @@ -1,10 +1,11 @@ -from typing import Any, Dict, Optional, AsyncIterator +from typing import Any, Dict, Optional, AsyncIterator, cast, Type import asyncio from loguru import logger from pydantic import BaseModel, Field from nemoguardrails import LLMRails, RailsConfig from nemoguardrails.llm.providers import register_llm_provider +from langchain_core.language_models.llms import BaseLLM from src.llm_orchestrator_config.llm_ochestrator_constants import ( GUARDRAILS_BLOCKED_PHRASES, ) @@ -65,9 +66,14 @@ def _register_custom_provider(self) -> None: logger.info("Registering DSPy custom LLM provider with NeMo Guardrails") - provider_factory = DSPyLLMProviderFactory() - - register_llm_provider("dspy-custom", provider_factory) + # NeMo Guardrails' register_llm_provider accepts callable factories at runtime. + # We instantiate DSPyLLMProviderFactory first, then register the instance. + # The factory instance implements __call__ to return DSPyNeMoLLM instances + # (which properly inherit from BaseLLM). This ensures NeMo can call the factory + # without trying to instantiate it with config kwargs that __init__ doesn't accept. + # We use cast to satisfy the type checker while maintaining runtime correctness. + factory = DSPyLLMProviderFactory() + register_llm_provider("dspy-custom", cast(Type[BaseLLM], factory)) logger.info("DSPy custom LLM provider registered successfully") except Exception as e: @@ -260,12 +266,17 @@ def _get_input_check_prompt(self, user_input: str) -> str: raise RuntimeError("Rails config not available") # Find the self_check_input prompt - for prompt in self._rails.config.prompts: - if prompt.task == "self_check_input": - # Replace the template variable with actual content - prompt_text = prompt.content.replace("{{ user_input }}", user_input) - logger.debug("Found self_check_input prompt in NeMo config") - return prompt_text + if self._rails.config.prompts: + for prompt in self._rails.config.prompts: + if prompt.task == "self_check_input": + # Ensure content is not None before calling replace + if prompt.content: + # Replace the template variable with actual content + prompt_text = prompt.content.replace( + "{{ user_input }}", user_input + ) + logger.debug("Found self_check_input prompt in NeMo config") + return prompt_text # Fallback if prompt not found in config logger.warning( @@ -503,14 +514,19 @@ def _get_output_check_prompt(self, bot_response: str) -> str: raise RuntimeError("Rails config not available") # Find the self_check_output prompt - for prompt in self._rails.config.prompts: - if prompt.task == "self_check_output": - # Replace the template variable with actual content - prompt_text = prompt.content.replace( - "{{ bot_response }}", bot_response - ) - logger.debug("Found self_check_output prompt in NeMo config") - return prompt_text + if self._rails.config.prompts: + for prompt in self._rails.config.prompts: + if prompt.task == "self_check_output": + # Ensure content is not None before calling replace + if prompt.content: + # Replace the template variable with actual content + prompt_text = prompt.content.replace( + "{{ bot_response }}", bot_response + ) + logger.debug( + "Found self_check_output prompt in NeMo config" + ) + return prompt_text # Fallback if prompt not found in config logger.warning( diff --git a/src/llm_orchestration_service.py b/src/llm_orchestration_service.py index 49b307d8..05303c26 100644 --- a/src/llm_orchestration_service.py +++ b/src/llm_orchestration_service.py @@ -39,6 +39,8 @@ TEST_DEPLOYMENT_ENVIRONMENT, STREAM_TOKEN_LIMIT_MESSAGE, PRODUCTION_DEPLOYMENT_ENVIRONMENT, + RUUTER_PROMPT_CONFIG_ENDPOINT, + PROMPT_CONFIG_CACHE_TTL, ) from src.llm_orchestrator_config.stream_config import StreamConfig from src.vector_indexer.constants import ResponseGenerationConstants @@ -49,6 +51,7 @@ from src.utils.budget_tracker import get_budget_tracker from src.utils.production_store import get_production_store from src.utils.language_detector import detect_language, get_language_name +from src.utils.prompt_config_loader import PromptConfigurationLoader from src.guardrails import NeMoRailsAdapter, GuardrailCheckResult from src.contextual_retrieval import ContextualRetriever from src.llm_orchestrator_config.exceptions import ( @@ -100,6 +103,29 @@ def __init__(self) -> None: """Initialize the orchestration service.""" self.langfuse_config = LangfuseConfig() + # Initialize prompt configuration loader + self.prompt_config_loader = PromptConfigurationLoader( + ruuter_endpoint=RUUTER_PROMPT_CONFIG_ENDPOINT, + cache_ttl_seconds=PROMPT_CONFIG_CACHE_TTL, + max_retries=3, + timeout_seconds=10, + ) + + try: + custom_instructions = self.prompt_config_loader.get_custom_instructions() + if custom_instructions: + logger.info( + f"Custom prompt configuration loaded at startup " + f"({len(custom_instructions)} chars)" + ) + else: + logger.info("ℹNo custom prompt configuration found - using defaults") + except Exception as e: + logger.warning( + f"Failed to load custom prompts at startup: {e}. " + f"Service will continue with default behavior." + ) + @observe(name="orchestration_request", as_type="agent") def process_orchestration_request( self, request: OrchestrationRequest @@ -141,7 +167,8 @@ def process_orchestration_request( ) # Store detected language in request for use throughout pipeline - request._detected_language = detected_language + # Using setattr for type safety - adds dynamic attribute to Pydantic model instance + setattr(request, "_detected_language", detected_language) # Initialize all service components components = self._initialize_service_components(request) @@ -269,7 +296,8 @@ async def stream_orchestration_response( ) # Store detected language in request for use throughout pipeline - request._detected_language = detected_language + # Using setattr for type safety - adds dynamic attribute to Pydantic model instance + setattr(request, "_detected_language", detected_language) # Use StreamManager for centralized tracking and guaranteed cleanup async with stream_manager.managed_stream( @@ -923,7 +951,7 @@ def _execute_orchestration_pipeline( components: Dict[str, Any], costs_dict: Dict[str, Dict[str, Any]], timing_dict: Dict[str, float], - ) -> OrchestrationResponse: + ) -> Union[OrchestrationResponse, TestOrchestrationResponse]: """Execute the main orchestration pipeline with all components.""" # Step 1: Input Guardrails Check if components["guardrails_adapter"]: @@ -977,17 +1005,22 @@ def _execute_orchestration_pipeline( timing_dict["response_generation"] = time.time() - start_time # Step 5: Output Guardrails Check + # Apply guardrails to all response types for consistent safety across all environments start_time = time.time() output_guardrails_response = self.handle_output_guardrails( - components["guardrails_adapter"], generated_response, request, costs_dict + components["guardrails_adapter"], + generated_response, + request, + costs_dict, ) timing_dict["output_guardrails_check"] = time.time() - start_time # Step 6: Store inference data (for production and testing environments) + # Only store OrchestrationResponse (has chatId), not TestOrchestrationResponse if request.environment in [ PRODUCTION_DEPLOYMENT_ENVIRONMENT, TEST_DEPLOYMENT_ENVIRONMENT, - ]: + ] and isinstance(output_guardrails_response, OrchestrationResponse): try: self._store_production_inference_data( request=request, @@ -1175,16 +1208,21 @@ async def _safe_retrieve_contextual_chunks( def handle_output_guardrails( self, guardrails_adapter: Optional[NeMoRailsAdapter], - generated_response: OrchestrationResponse, + generated_response: Union[OrchestrationResponse, TestOrchestrationResponse], request: OrchestrationRequest, costs_dict: Dict[str, Dict[str, Any]], - ) -> OrchestrationResponse: - """Check output guardrails and handle blocked responses.""" - if ( + ) -> Union[OrchestrationResponse, TestOrchestrationResponse]: + """Check output guardrails and handle blocked responses for both response types.""" + # Determine if we should run guardrails (same logic for both response types) + should_check_guardrails = ( guardrails_adapter is not None and generated_response.llmServiceActive and not generated_response.questionOutOfLLMScope - ): + ) + + if should_check_guardrails: + # Type assertion: should_check_guardrails guarantees guardrails_adapter is not None + assert guardrails_adapter is not None output_check_result = self._check_output_guardrails( guardrails_adapter=guardrails_adapter, assistant_message=generated_response.content, @@ -1201,13 +1239,23 @@ def handle_output_guardrails( OUTPUT_GUARDRAIL_VIOLATION_MESSAGES, detected_lang ) - return OrchestrationResponse( - chatId=request.chatId, - llmServiceActive=True, - questionOutOfLLMScope=False, - inputGuardFailed=False, - content=localized_msg, - ) + # Return appropriate response type based on original response type + if isinstance(generated_response, TestOrchestrationResponse): + return TestOrchestrationResponse( + llmServiceActive=True, + questionOutOfLLMScope=False, + inputGuardFailed=False, + content=localized_msg, + chunks=None, + ) + else: + return OrchestrationResponse( + chatId=request.chatId, + llmServiceActive=True, + questionOutOfLLMScope=False, + inputGuardFailed=False, + content=localized_msg, + ) logger.info("Output guardrails check passed") else: @@ -2002,9 +2050,14 @@ def _initialize_response_generator( logger.info("Initializing response generator") try: + # Get custom instructions for response generation + custom_prefix = self._get_custom_instructions_for_response_generation() + # Set up DSPy configuration for the response generator with llm_manager.use_task_local(): - response_generator = ResponseGeneratorAgent() + response_generator = ResponseGeneratorAgent( + custom_instructions_prefix=custom_prefix + ) logger.info("Response generator initialized successfully") return response_generator @@ -2013,6 +2066,28 @@ def _initialize_response_generator( logger.error(f"Failed to initialize response generator: {str(e)}") raise + def _get_custom_instructions_for_response_generation(self) -> str: + """ + Get custom prompt instructions for response generation only. + + Note: Applied only to ResponseGeneratorAgent, not PromptRefinerAgent. + PromptRefiner focuses on query optimization for retrieval, while + ResponseGenerator needs to follow language policy and interaction style + for user-facing content. + + Returns: + str: Custom instruction prefix for prepending to questions + """ + try: + custom_prompt = self.prompt_config_loader.get_custom_instructions() + if custom_prompt: + # Format for prepending to questions in ResponseGenerator + return f"[SYSTEM INSTRUCTIONS]\n{custom_prompt}\n\n[USER QUESTION]\n" + return "" + except Exception as e: + logger.error(f"Error retrieving custom instructions: {e}") + return "" + @staticmethod def _format_chunks_for_test_response( relevant_chunks: Optional[List[Dict[str, Union[str, float, Dict[str, Any]]]]], diff --git a/src/llm_orchestration_service_api.py b/src/llm_orchestration_service_api.py index b58eac94..3ed24ce2 100644 --- a/src/llm_orchestration_service_api.py +++ b/src/llm_orchestration_service_api.py @@ -30,6 +30,7 @@ from src.utils.stream_timeout import stream_timeout from src.utils.error_utils import generate_error_id, log_error_with_context from src.utils.rate_limiter import RateLimiter +from src.utils.prompt_config_loader import RefreshStatus from models.request_models import ( OrchestrationRequest, OrchestrationResponse, @@ -276,7 +277,7 @@ def orchestrate_llm_request( raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error occurred", - ) + ) from e @app.post( @@ -658,7 +659,7 @@ async def create_embeddings(request: EmbeddingRequest) -> EmbeddingResponse: "error": "Embedding creation failed", "retry_after": 30, }, - ) + ) from e @app.post("/generate-context", response_model=ContextGenerationResponse) @@ -679,7 +680,7 @@ async def generate_context_with_caching( except Exception as e: error_id = generate_error_id() log_error_with_context(logger, error_id, "context_generation_endpoint", None, e) - raise HTTPException(status_code=500, detail="Context generation failed") + raise HTTPException(status_code=500, detail="Context generation failed") from e @app.get("/embedding-models") @@ -715,7 +716,128 @@ async def get_available_embedding_models( ) raise HTTPException( status_code=500, detail="Failed to retrieve embedding models" + ) from e + + +@app.post("/prompt-config/refresh") +def refresh_prompt_config(http_request: Request) -> Dict[str, Any]: + """ + Force immediate refresh of prompt configuration cache. + + This endpoint is called by Ruuter after admin updates the prompt configuration + in the database, ensuring the changes are reflected immediately without waiting + for the cache TTL to expire. + + Returns: + Dictionary with refresh status and message + + Raises: + HTTPException (503): If prompt configuration loader is not initialized + HTTPException (404): If no prompt configuration found in database + HTTPException (500): If refresh operation fails + """ + orchestration_service = http_request.app.state.orchestration_service + + # Check if loader is initialized + if not orchestration_service or not hasattr( + orchestration_service, "prompt_config_loader" + ): + error_id = generate_error_id() + logger.error(f"[{error_id}] Prompt configuration loader not initialized") + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail={ + "error": "Prompt configuration loader not initialized", + "error_id": error_id, + }, + ) + + try: + # Use new method that returns detailed status + refresh_result = ( + orchestration_service.prompt_config_loader.force_refresh_with_status() ) + refresh_status = refresh_result.get("status") + + if refresh_status == RefreshStatus.SUCCESS: + # Success - configuration loaded + logger.info("Prompt configuration refreshed successfully") + return { + "refreshed": True, + "message": refresh_result.get("message"), + "prompt_length": refresh_result.get("length"), + } + + elif refresh_status == RefreshStatus.NOT_FOUND: + # Configuration absent in database + error_id = generate_error_id() + logger.warning(f"[{error_id}] Prompt configuration not found in database") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "error": refresh_result.get("message"), + "error_id": error_id, + }, + ) + + elif refresh_status == RefreshStatus.FETCH_FAILED: + # Upstream service failure (network/HTTP/timeout errors) + error_id = generate_error_id() + had_stale = refresh_result.get("had_stale_cache", False) + + if had_stale: + logger.warning( + f"[{error_id}] Upstream service unavailable, stale cache exists" + ) + # Temporarily unavailable but we have fallback + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail={ + "error": "Upstream service temporarily unavailable", + "error_id": error_id, + "message": "Stale configuration available as fallback", + }, + ) + else: + logger.warning( + f"[{error_id}] Upstream service unavailable, no cache exists" + ) + # Service gateway error or timeout + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail={ + "error": refresh_result.get("message"), + "error_id": error_id, + "details": refresh_result.get("error"), + }, + ) + + else: + # Unexpected status - should never happen but handle defensively + error_id = generate_error_id() + logger.error(f"[{error_id}] Unexpected refresh status: {refresh_status}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={ + "error": "Unexpected error during refresh", + "error_id": error_id, + }, + ) + + except HTTPException: + # Re-raise HTTP exceptions as-is + raise + except Exception as e: + # Unexpected errors during refresh + error_id = generate_error_id() + logger.error(f"[{error_id}] Failed to refresh prompt configuration: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={ + "error": "Failed to refresh prompt configuration", + "error_id": error_id, + }, + ) from e if __name__ == "__main__": diff --git a/src/llm_orchestrator_config/exceptions.py b/src/llm_orchestrator_config/exceptions.py index 5d610636..1b74d0b4 100644 --- a/src/llm_orchestrator_config/exceptions.py +++ b/src/llm_orchestrator_config/exceptions.py @@ -1,5 +1,7 @@ """Custom exceptions for the LLM Config Module.""" +from typing import Optional + class LLMConfigError(Exception): """Base exception for LLM configuration errors.""" @@ -52,7 +54,7 @@ class ContextualRetrievalFailureError(ContextualRetrievalError): class StreamTimeoutException(LLMConfigError): """Raised when stream duration exceeds maximum allowed time.""" - def __init__(self, message: str = "Stream timeout", error_id: str = None): + def __init__(self, message: str = "Stream timeout", error_id: Optional[str] = None): """ Initialize StreamTimeoutException with error tracking. @@ -76,7 +78,7 @@ class StreamSizeLimitException(LLMConfigError): class StreamException(LLMConfigError): """Base exception for streaming operations with error tracking.""" - def __init__(self, message: str, error_id: str = None): + def __init__(self, message: str, error_id: Optional[str] = None): """ Initialize StreamException with error tracking. diff --git a/src/llm_orchestrator_config/llm_ochestrator_constants.py b/src/llm_orchestrator_config/llm_ochestrator_constants.py index 61af6963..90d01ed5 100644 --- a/src/llm_orchestrator_config/llm_ochestrator_constants.py +++ b/src/llm_orchestrator_config/llm_ochestrator_constants.py @@ -124,3 +124,9 @@ def get_localized_message(message_dict: dict, language_code: str = "en") -> str: RAG_SEARCH_RESQL = "http://resql:8082/rag-search" RAG_SEARCH_RUUTER_PUBLIC = "http://ruuter-public:8086/rag-search" RAG_SEARCH_RUUTER_PRIVATE = "http://ruuter-private:8088/rag-search" + +# Custom Prompt Configuration +RUUTER_PROMPT_CONFIG_ENDPOINT = ( + "http://ruuter-public:8086/rag-search/llm-connections/prompts/get-prompt" +) +PROMPT_CONFIG_CACHE_TTL = 300 # 5 minutes cache diff --git a/src/optimization/metrics/generator_metrics.py b/src/optimization/metrics/generator_metrics.py index becf64a0..3034f1e7 100644 --- a/src/optimization/metrics/generator_metrics.py +++ b/src/optimization/metrics/generator_metrics.py @@ -5,6 +5,7 @@ from typing import Any, Dict, List import dspy +from dspy.evaluate import SemanticF1 from loguru import logger @@ -34,7 +35,7 @@ def __init__(self, scope_weight: float = 0.5, quality_weight: float = 0.5): # Initialize DSPy's native SemanticF1 with decompositional mode # This uses the configured LM to evaluate semantic similarity - self.semantic_f1 = dspy.evaluate.SemanticF1(decompositional=True) + self.semantic_f1 = SemanticF1(decompositional=True) logger.info("Initialized GeneratorMetric with DSPy's native SemanticF1") @@ -97,6 +98,11 @@ def __call__( quality_score = self.semantic_f1(semantic_example, semantic_prediction) + # Ensure quality_score is a float (SemanticF1 returns float) + quality_score = ( + float(quality_score) if quality_score is not None else 0.0 + ) + logger.debug(f"SemanticF1 quality score: {quality_score:.3f}") except Exception as e: diff --git a/src/response_generator/response_generate.py b/src/response_generator/response_generate.py index 23aa7442..3dffbfb5 100644 --- a/src/response_generator/response_generate.py +++ b/src/response_generator/response_generate.py @@ -67,7 +67,7 @@ class ScopeChecker(dspy.Signature): def build_context_and_citations( - chunks: List[Dict[str, Any]], use_top_k: int = None + chunks: List[Dict[str, Any]], use_top_k: Optional[int] = None ) -> Tuple[List[str], List[str], bool]: """ Turn retriever chunks -> numbered context blocks and source labels. @@ -124,9 +124,15 @@ class ResponseGeneratorAgent(dspy.Module): Returns a dict: {"answer": str, "questionOutOfLLMScope": bool, "usage": dict} """ - def __init__(self, max_retries: int = 2, use_optimized: bool = True) -> None: + def __init__( + self, + max_retries: int = 2, + use_optimized: bool = True, + custom_instructions_prefix: str = "", + ) -> None: super().__init__() self._max_retries = max(0, int(max_retries)) + self._custom_instructions_prefix = custom_instructions_prefix # Attribute to cache the streamified predictor self._stream_predictor: Optional[Any] = None @@ -238,6 +244,14 @@ async def stream_response( f"Starting NATIVE DSPy streaming for question with {len(chunks)} chunks" ) + # Apply custom instructions while keeping the user question first, if provided + augmented_question = question + if self._custom_instructions_prefix: + augmented_question = f"{question}\n\n{self._custom_instructions_prefix}" + logger.debug( + f"Applied custom instructions after question for streaming ({len(self._custom_instructions_prefix)} chars)" + ) + output_stream = None try: # Build context @@ -254,10 +268,10 @@ async def stream_response( # Get the streamified predictor stream_predictor = self._get_stream_predictor() - # Call the streamified predictor + # Call the streamified predictor with augmented question logger.info("Calling streamified predictor with signature inputs...") output_stream = stream_predictor( - question=question, + question=augmented_question, context_blocks=context_blocks, citations=citation_labels, ) @@ -391,6 +405,14 @@ def forward( logger.info(f"Generating response for question: '{question}'") + # Apply custom instructions while keeping the user question first, if provided + augmented_question = question + if self._custom_instructions_prefix: + augmented_question = f"{question}\n\n{self._custom_instructions_prefix}" + logger.debug( + f"Applied custom instructions after question ({len(self._custom_instructions_prefix)} chars)" + ) + lm = dspy.settings.lm history_length_before = len(lm.history) if lm and hasattr(lm, "history") else 0 @@ -398,7 +420,7 @@ def forward( chunks, use_top_k=max_blocks ) - pred = self._predict_once(question, context_blocks, citation_labels) + pred = self._predict_once(augmented_question, context_blocks, citation_labels) valid = self._validate_prediction(pred) attempts = 0 @@ -407,7 +429,7 @@ def forward( logger.warning(f"Retry attempt {attempts}/{self._max_retries}") pred = self._predictor( - question=question, + question=augmented_question, context_blocks=context_blocks, citations=citation_labels, config={"rollout_id": attempts, "temperature": 0.1}, diff --git a/src/utils/prompt_config_loader.py b/src/utils/prompt_config_loader.py new file mode 100644 index 00000000..8df8945b --- /dev/null +++ b/src/utils/prompt_config_loader.py @@ -0,0 +1,414 @@ +""" +Prompt configuration loader with HTTP client, caching, and retry logic. +""" + +import requests +from typing import Optional, Dict, Any +import time +import threading +from enum import Enum +from loguru import logger + + +class PromptConfigLoadError(Exception): + """Raised when all retry attempts to load prompt configuration fail.""" + + pass + + +class RefreshStatus(Enum): + """Status of a refresh operation.""" + + SUCCESS = "success" # Configuration loaded successfully + NOT_FOUND = "not_found" # Configuration absent in database + FETCH_FAILED = "fetch_failed" # Network/HTTP/upstream errors + + +class PromptConfigurationLoader: + """ + Loads custom prompt configurations from Ruuter endpoint. + + Features: + - HTTP-based loading via Ruuter + - 5-minute TTL cache (configurable) + - 3-attempt retry with exponential backoff + - Thread-safe caching + - Graceful degradation with stale cache fallback + """ + + def __init__( + self, + ruuter_endpoint: str, + cache_ttl_seconds: int = 300, + max_retries: int = 3, + timeout_seconds: int = 10, + ) -> None: + """ + Initialize prompt configuration loader. + + Args: + ruuter_endpoint: Full URL to Ruuter endpoint + cache_ttl_seconds: Cache TTL in seconds (default: 300 = 5 minutes) + max_retries: Maximum retry attempts on failure (default: 3) + timeout_seconds: HTTP request timeout (default: 10) + """ + self.ruuter_endpoint = ruuter_endpoint + self.cache_ttl_seconds = cache_ttl_seconds + self.max_retries = max_retries + self.timeout_seconds = timeout_seconds + + # Cache storage + self._cached_prompt: Optional[str] = None + self._cache_timestamp: Optional[float] = None + self._cache_lock = threading.Lock() + self._cache_condition = threading.Condition(self._cache_lock) + self._fetch_in_progress = False + + # Statistics for monitoring + self._cache_hits = 0 + self._cache_misses = 0 + self._load_failures = 0 + self._last_error: Optional[str] = None + + logger.info( + f"PromptConfigurationLoader initialized: " + f"endpoint={ruuter_endpoint}, ttl={cache_ttl_seconds}s, retries={max_retries}" + ) + + def get_custom_instructions(self) -> str: + """ + Get custom prompt configuration (cached or fresh). + + Uses fine-grained locking with thundering herd prevention: + - Quick cache check under lock + - Release lock during slow network I/O + - Only one thread fetches, others wait + - Re-acquire lock to update cache + + Returns: + str: Custom instruction text, or empty string if unavailable + """ + # Step 1: Quick cache check under lock + with self._cache_condition: + # Check cache validity + if self._is_cache_valid(): + self._cache_hits += 1 + logger.debug( + f"Prompt config cache HIT " + f"(age: {self._get_cache_age():.1f}s, " + f"hits: {self._cache_hits}, misses: {self._cache_misses})" + ) + return self._cached_prompt or "" + + # Cache miss/expired + self._cache_misses += 1 + logger.info( + f"Prompt config cache MISS - loading from Ruuter " + f"(cache age: {self._get_cache_age():.1f}s)" + ) + + # Thundering herd prevention: if another thread is fetching, wait + while self._fetch_in_progress: + logger.debug("Another thread is fetching, waiting...") + self._cache_condition.wait() # Release lock and wait + # After waking up, check if cache was updated + if self._is_cache_valid(): + logger.debug("Cache updated by another thread") + return self._cached_prompt or "" + + # We're the first one, mark fetch in progress + self._fetch_in_progress = True + + # Step 2: Fetch WITHOUT holding lock (allows concurrent cache reads) + prompt_text = None + fetch_error = None + try: + prompt_text = self._load_from_ruuter_with_retry() + + except PromptConfigLoadError as e: + fetch_error = e + logger.error(f"Failed to fetch prompt configuration after retries: {e}") + + except Exception as e: + fetch_error = e + logger.error(f"Unexpected error loading prompt configuration: {e}") + + # Step 3: Update cache and notify waiters (lock re-acquired) + with self._cache_condition: + try: + if prompt_text: + # Success - update cache + self._cached_prompt = prompt_text + self._cache_timestamp = time.time() + self._last_error = None + logger.info( + f"Prompt configuration loaded successfully " + f"({len(prompt_text)} chars)" + ) + return prompt_text + + elif prompt_text is None and fetch_error is None: + # No configuration found - cache empty result to avoid repeated loads + logger.warning( + "No prompt configuration found in database; caching empty result" + ) + self._cached_prompt = "" + self._cache_timestamp = time.time() + self._last_error = None + return "" + + else: + # Fetch failed - handle error + self._load_failures += 1 + self._last_error = str(fetch_error) + logger.error( + f"Failed to fetch prompt configuration " + f"(total failures: {self._load_failures})" + ) + # Fallback to stale cache or empty string + if self._cached_prompt: + logger.warning( + f"Using stale cache due to fetch failure (age: {self._get_cache_age():.1f}s)" + ) + return self._cached_prompt or "" + + finally: + # Always clear in-progress flag and notify waiting threads + self._fetch_in_progress = False + self._cache_condition.notify_all() # Wake up all waiting threads + + def _is_cache_valid(self) -> bool: + """Check if cache is within TTL window.""" + if self._cached_prompt is None or self._cache_timestamp is None: + return False + + age = time.time() - self._cache_timestamp + return age < self.cache_ttl_seconds + + def _get_cache_age(self) -> float: + """Get cache age in seconds.""" + if self._cache_timestamp is None: + return float("inf") + return time.time() - self._cache_timestamp + + def _load_from_ruuter_with_retry(self) -> Optional[str]: + """ + Load configuration from Ruuter with exponential backoff retry. + + Retry strategy: + - Attempt 1: 0s wait + - Attempt 2: 1s wait + - Attempt 3: 2s wait + + Returns: + Optional[str]: Prompt text if found, None if configuration is empty/not found + + Raises: + PromptConfigLoadError: If all retry attempts fail due to HTTP/network errors + """ + for attempt in range(1, self.max_retries + 1): + try: + logger.debug( + f"Calling Ruuter endpoint " + f"(attempt {attempt}/{self.max_retries}): {self.ruuter_endpoint}" + ) + + response = requests.post( + self.ruuter_endpoint, + json={}, # Empty POST body + timeout=self.timeout_seconds, + headers={"Content-Type": "application/json"}, + ) + + # Check HTTP status + if response.status_code == 200: + data = response.json() + + # Handle response format - Ruuter wraps response in 'response' key + prompt = "" + + # Unwrap Ruuter's response wrapper if present + if isinstance(data, dict) and "response" in data: + logger.info("Unwrapping 'response' key") + data = data["response"] + + # Now extract prompt from the unwrapped data + if isinstance(data, list) and len(data) > 0: + # Array format: [{"id": 1, "prompt": "..."}] + first_elem_keys = ( + list(data[0].keys()) if isinstance(data[0], dict) else [] + ) + logger.info( + f"Extracting from list, first element keys: {first_elem_keys}" + ) + prompt = data[0].get("prompt", "").strip() + elif isinstance(data, dict): + # Dict format: {"id": 1, "prompt": "..."} + logger.info(f"Extracting from dict, keys: {list(data.keys())}") + prompt = data.get("prompt", "").strip() + else: + logger.warning( + f"Unexpected data type: {type(data).__name__}, structure not recognized" + ) + + logger.info( + f"Extracted prompt length: {len(prompt) if prompt else 0}" + ) + + if prompt: + logger.info( + f"Loaded prompt on attempt {attempt} ({len(prompt)} chars)" + ) + return prompt + else: + logger.warning(f"Prompt field is empty (attempt {attempt})") + return None # Database has no configuration + + else: + logger.warning( + f"HTTP {response.status_code} on attempt {attempt}: " + f"{response.text[:200]}" + ) + + except requests.exceptions.Timeout: + logger.warning( + f"Request timeout on attempt {attempt} " + f"(timeout: {self.timeout_seconds}s)" + ) + + except requests.exceptions.ConnectionError as e: + logger.warning(f"Connection error on attempt {attempt}: {str(e)[:100]}") + + except requests.exceptions.RequestException as e: + logger.warning(f"Request error on attempt {attempt}: {str(e)[:100]}") + + except (ValueError, KeyError) as e: + logger.error(f"Invalid response format on attempt {attempt}: {e}") + + except Exception as e: + logger.error(f"Unexpected error on attempt {attempt}: {e}") + + # Wait before retry (except on last attempt) + if attempt < self.max_retries: + wait_time = 2 ** (attempt - 1) # 1s, 2s + logger.debug(f"Retrying in {wait_time}s...") + time.sleep(wait_time) + + # All retries failed - raise exception to distinguish from "not found" + error_msg = ( + f"All {self.max_retries} attempts failed to load prompt configuration" + ) + logger.error(error_msg) + raise PromptConfigLoadError(error_msg) + + def force_refresh(self) -> bool: + """ + Force immediate cache refresh. + + Returns: + bool: True if fresh data was successfully loaded, False otherwise + """ + status_dict = self.force_refresh_with_status() + return status_dict["status"] == RefreshStatus.SUCCESS + + def force_refresh_with_status(self) -> Dict[str, Any]: + """ + Force immediate cache refresh and return detailed status. + + Returns: + Dict with keys: + - status: RefreshStatus enum value + - message: Human-readable message + - error: Error message (if status != SUCCESS) + """ + logger.info("Forcing prompt configuration cache refresh") + + # Track state before refresh + had_cached_value = self._cached_prompt is not None + + with self._cache_condition: + # Invalidate both timestamp and cached value so that a failed refresh + # cannot fall back to a stale prompt and be misreported as success. + self._cache_timestamp = None + self._cached_prompt = None + + # Attempt fresh load + prompt_text = None + fetch_error = None + try: + prompt_text = self._load_from_ruuter_with_retry() + + except PromptConfigLoadError as e: + fetch_error = e + logger.error(f"Failed to fetch prompt configuration after retries: {e}") + + except Exception as e: + fetch_error = e + logger.error(f"Unexpected error loading prompt configuration: {e}") + + # Determine status and update cache + with self._cache_condition: + if prompt_text: + # Success - update cache + self._cached_prompt = prompt_text + self._cache_timestamp = time.time() + self._last_error = None + logger.info( + f"Prompt configuration refreshed successfully ({len(prompt_text)} chars)" + ) + return { + "status": RefreshStatus.SUCCESS, + "message": "Prompt configuration refreshed successfully", + "length": len(prompt_text), + } + + elif prompt_text is None and fetch_error is None: + # No configuration found (explicit empty response from Ruuter) + self._cached_prompt = "" + self._cache_timestamp = time.time() + self._last_error = None + logger.warning("No prompt configuration found in database") + return { + "status": RefreshStatus.NOT_FOUND, + "message": "No prompt configuration found in database", + "error": None, + } + + else: + # Fetch failed (network/HTTP/timeout errors) + self._load_failures += 1 + self._last_error = str(fetch_error) + logger.error(f"Failed to fetch prompt configuration: {fetch_error}") + + # Do NOT cache empty result on failure - let next call retry + # Only keep stale cache if it existed before + if had_cached_value: + logger.warning("Keeping stale cache due to fetch failure") + + return { + "status": RefreshStatus.FETCH_FAILED, + "message": "Failed to refresh configuration due to upstream service error", + "error": str(fetch_error), + "had_stale_cache": had_cached_value, + } + + def get_cache_stats(self) -> Dict[str, Any]: + """Get cache statistics for monitoring.""" + with self._cache_condition: + return { + "cache_hits": self._cache_hits, + "cache_misses": self._cache_misses, + "load_failures": self._load_failures, + "cache_age_seconds": ( + round(self._get_cache_age(), 2) if self._is_cache_valid() else None + ), + "has_cached_value": self._cached_prompt is not None, + "cache_valid": self._is_cache_valid(), + "cached_prompt_length": ( + len(self._cached_prompt) if self._cached_prompt else 0 + ), + "last_error": self._last_error, + "ruuter_endpoint": self.ruuter_endpoint, + "cache_ttl_seconds": self.cache_ttl_seconds, + "fetch_in_progress": self._fetch_in_progress, + }