diff --git a/docs/CUSTOM_PROMPT_CONFIGURATION.md b/docs/CUSTOM_PROMPT_CONFIGURATION.md index 81e8226..8a7f94e 100644 --- a/docs/CUSTOM_PROMPT_CONFIGURATION.md +++ b/docs/CUSTOM_PROMPT_CONFIGURATION.md @@ -165,7 +165,7 @@ The custom prompt configuration system allows admins to configure prompts via UI 3. **Cache Invalidation** (`/prompt-config/refresh`) - `force_refresh()` clears cache immediately - Fetches new prompt from Ruuter - - Returns success status with prompt preview + - Returns success status with prompt length and content hash (no preview for security) ### **User Request Processing** 1. **Request Received** (Any of 3 endpoints) @@ -192,7 +192,7 @@ The custom prompt configuration system allows admins to configure prompts via UI 5. **Question Modification** ```python # In forward() or stream_response() - modified_question = f"{custom_instructions_prefix}{user_question}" + modified_question = f"{user_question}{custom_instructions_prefix}" ``` 6. **LLM Processing** @@ -275,6 +275,17 @@ WHERE id = 1; curl -X POST http://localhost:8100/prompt-config/refresh ``` +**Response:** +```json +{ + "refreshed": true, + "message": "Prompt configuration refreshed successfully", + "prompt_length": 245, + "content_hash": "a3f5b8c9e1d2f4a6" +} +``` +**Note:** For security, the endpoint returns only the prompt length and a SHA-256 hash (not the actual prompt content). + --- ## Key Features diff --git a/src/llm_orchestration_service.py b/src/llm_orchestration_service.py index bcea291..92dd7b0 100644 --- a/src/llm_orchestration_service.py +++ b/src/llm_orchestration_service.py @@ -1055,19 +1055,14 @@ def _execute_orchestration_pipeline( timing_dict["response_generation"] = time.time() - start_time # Step 5: Output Guardrails Check - # Only apply guardrails to OrchestrationResponse (production/testing), - # TestOrchestrationResponse doesn't require chatId-based guardrail handling + # Apply guardrails to all response types for consistent safety across all environments start_time = time.time() - if isinstance(generated_response, OrchestrationResponse): - output_guardrails_response = self.handle_output_guardrails( - components["guardrails_adapter"], - generated_response, - request, - costs_dict, - ) - else: - # TestOrchestrationResponse - skip output guardrails - output_guardrails_response = generated_response + output_guardrails_response = self.handle_output_guardrails( + components["guardrails_adapter"], + generated_response, + request, + costs_dict, + ) timing_dict["output_guardrails_check"] = time.time() - start_time # Step 6: Store inference data (for production and testing environments) @@ -1263,16 +1258,21 @@ async def _safe_retrieve_contextual_chunks( def handle_output_guardrails( self, guardrails_adapter: Optional[NeMoRailsAdapter], - generated_response: OrchestrationResponse, + generated_response: Union[OrchestrationResponse, TestOrchestrationResponse], request: OrchestrationRequest, costs_dict: Dict[str, Dict[str, Any]], - ) -> OrchestrationResponse: - """Check output guardrails and handle blocked responses.""" - if ( + ) -> Union[OrchestrationResponse, TestOrchestrationResponse]: + """Check output guardrails and handle blocked responses for both response types.""" + # Determine if we should run guardrails (same logic for both response types) + should_check_guardrails = ( guardrails_adapter is not None and generated_response.llmServiceActive and not generated_response.questionOutOfLLMScope - ): + ) + + if should_check_guardrails: + # Type assertion: should_check_guardrails guarantees guardrails_adapter is not None + assert guardrails_adapter is not None output_check_result = self._check_output_guardrails( guardrails_adapter=guardrails_adapter, assistant_message=generated_response.content, @@ -1289,13 +1289,23 @@ def handle_output_guardrails( OUTPUT_GUARDRAIL_VIOLATION_MESSAGES, detected_lang ) - return OrchestrationResponse( - chatId=request.chatId, - llmServiceActive=True, - questionOutOfLLMScope=False, - inputGuardFailed=False, - content=localized_msg, - ) + # Return appropriate response type based on original response type + if isinstance(generated_response, TestOrchestrationResponse): + return TestOrchestrationResponse( + llmServiceActive=True, + questionOutOfLLMScope=False, + inputGuardFailed=False, + content=localized_msg, + chunks=None, + ) + else: + return OrchestrationResponse( + chatId=request.chatId, + llmServiceActive=True, + questionOutOfLLMScope=False, + inputGuardFailed=False, + content=localized_msg, + ) logger.info("Output guardrails check passed") else: diff --git a/src/llm_orchestration_service_api.py b/src/llm_orchestration_service_api.py index b2c9f9f..8bdc80c 100644 --- a/src/llm_orchestration_service_api.py +++ b/src/llm_orchestration_service_api.py @@ -30,6 +30,7 @@ from src.utils.stream_timeout import stream_timeout from src.utils.error_utils import generate_error_id, log_error_with_context from src.utils.rate_limiter import RateLimiter +from src.utils.prompt_config_loader import RefreshStatus from models.request_models import ( OrchestrationRequest, OrchestrationResponse, @@ -751,6 +752,93 @@ def refresh_prompt_config(http_request: Request) -> Dict[str, Any]: }, ) + try: + # Use new method that returns detailed status + refresh_result = ( + orchestration_service.prompt_config_loader.force_refresh_with_status() + ) + refresh_status = refresh_result.get("status") + + if refresh_status == RefreshStatus.SUCCESS: + # Success - configuration loaded + logger.info("Prompt configuration refreshed successfully") + return { + "refreshed": True, + "message": refresh_result.get("message"), + "prompt_length": refresh_result.get("length"), + } + + elif refresh_status == RefreshStatus.NOT_FOUND: + # Configuration absent in database + error_id = generate_error_id() + logger.warning(f"[{error_id}] Prompt configuration not found in database") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={ + "error": refresh_result.get("message"), + "error_id": error_id, + }, + ) + + elif refresh_status == RefreshStatus.FETCH_FAILED: + # Upstream service failure (network/HTTP/timeout errors) + error_id = generate_error_id() + had_stale = refresh_result.get("had_stale_cache", False) + + if had_stale: + logger.warning( + f"[{error_id}] Upstream service unavailable, stale cache exists" + ) + # Temporarily unavailable but we have fallback + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail={ + "error": "Upstream service temporarily unavailable", + "error_id": error_id, + "message": "Stale configuration available as fallback", + }, + ) + else: + logger.warning( + f"[{error_id}] Upstream service unavailable, no cache exists" + ) + # Service gateway error or timeout + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail={ + "error": refresh_result.get("message"), + "error_id": error_id, + "details": refresh_result.get("error"), + }, + ) + + else: + # Unexpected status - should never happen but handle defensively + error_id = generate_error_id() + logger.error(f"[{error_id}] Unexpected refresh status: {refresh_status}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={ + "error": "Unexpected error during refresh", + "error_id": error_id, + }, + ) + + except HTTPException: + # Re-raise HTTP exceptions as-is + raise + except Exception as e: + # Unexpected errors during refresh + error_id = generate_error_id() + logger.error(f"[{error_id}] Failed to refresh prompt configuration: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={ + "error": "Failed to refresh prompt configuration", + "error_id": error_id, + }, + ) from e + try: success = orchestration_service.prompt_config_loader.force_refresh() diff --git a/src/utils/prompt_config_loader.py b/src/utils/prompt_config_loader.py index f7d5d3e..8df8945 100644 --- a/src/utils/prompt_config_loader.py +++ b/src/utils/prompt_config_loader.py @@ -6,6 +6,7 @@ from typing import Optional, Dict, Any import time import threading +from enum import Enum from loguru import logger @@ -15,6 +16,14 @@ class PromptConfigLoadError(Exception): pass +class RefreshStatus(Enum): + """Status of a refresh operation.""" + + SUCCESS = "success" # Configuration loaded successfully + NOT_FOUND = "not_found" # Configuration absent in database + FETCH_FAILED = "fetch_failed" # Network/HTTP/upstream errors + + class PromptConfigurationLoader: """ Loads custom prompt configurations from Ruuter endpoint. @@ -226,7 +235,12 @@ def _load_from_ruuter_with_retry(self) -> Optional[str]: # Now extract prompt from the unwrapped data if isinstance(data, list) and len(data) > 0: # Array format: [{"id": 1, "prompt": "..."}] - logger.info(f"Extracting from list, first element: {data[0]}") + first_elem_keys = ( + list(data[0].keys()) if isinstance(data[0], dict) else [] + ) + logger.info( + f"Extracting from list, first element keys: {first_elem_keys}" + ) prompt = data[0].get("prompt", "").strip() elif isinstance(data, dict): # Dict format: {"id": 1, "prompt": "..."} @@ -234,7 +248,7 @@ def _load_from_ruuter_with_retry(self) -> Optional[str]: prompt = data.get("prompt", "").strip() else: logger.warning( - f"Unexpected data type: {type(data)}, value: {data}" + f"Unexpected data type: {type(data).__name__}, structure not recognized" ) logger.info( @@ -294,15 +308,89 @@ def force_refresh(self) -> bool: Returns: bool: True if fresh data was successfully loaded, False otherwise """ + status_dict = self.force_refresh_with_status() + return status_dict["status"] == RefreshStatus.SUCCESS + + def force_refresh_with_status(self) -> Dict[str, Any]: + """ + Force immediate cache refresh and return detailed status. + + Returns: + Dict with keys: + - status: RefreshStatus enum value + - message: Human-readable message + - error: Error message (if status != SUCCESS) + """ logger.info("Forcing prompt configuration cache refresh") + + # Track state before refresh + had_cached_value = self._cached_prompt is not None + with self._cache_condition: # Invalidate both timestamp and cached value so that a failed refresh # cannot fall back to a stale prompt and be misreported as success. self._cache_timestamp = None self._cached_prompt = None - result = self.get_custom_instructions() - return bool(result) + # Attempt fresh load + prompt_text = None + fetch_error = None + try: + prompt_text = self._load_from_ruuter_with_retry() + + except PromptConfigLoadError as e: + fetch_error = e + logger.error(f"Failed to fetch prompt configuration after retries: {e}") + + except Exception as e: + fetch_error = e + logger.error(f"Unexpected error loading prompt configuration: {e}") + + # Determine status and update cache + with self._cache_condition: + if prompt_text: + # Success - update cache + self._cached_prompt = prompt_text + self._cache_timestamp = time.time() + self._last_error = None + logger.info( + f"Prompt configuration refreshed successfully ({len(prompt_text)} chars)" + ) + return { + "status": RefreshStatus.SUCCESS, + "message": "Prompt configuration refreshed successfully", + "length": len(prompt_text), + } + + elif prompt_text is None and fetch_error is None: + # No configuration found (explicit empty response from Ruuter) + self._cached_prompt = "" + self._cache_timestamp = time.time() + self._last_error = None + logger.warning("No prompt configuration found in database") + return { + "status": RefreshStatus.NOT_FOUND, + "message": "No prompt configuration found in database", + "error": None, + } + + else: + # Fetch failed (network/HTTP/timeout errors) + self._load_failures += 1 + self._last_error = str(fetch_error) + logger.error(f"Failed to fetch prompt configuration: {fetch_error}") + + # Do NOT cache empty result on failure - let next call retry + # Only keep stale cache if it existed before + if had_cached_value: + logger.warning("Keeping stale cache due to fetch failure") + + return { + "status": RefreshStatus.FETCH_FAILED, + "message": "Failed to refresh configuration due to upstream service error", + "error": str(fetch_error), + "had_stale_cache": had_cached_value, + } def get_cache_stats(self) -> Dict[str, Any]: """Get cache statistics for monitoring."""