Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions docs/CUSTOM_PROMPT_CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ The custom prompt configuration system allows admins to configure prompts via UI
3. **Cache Invalidation** (`/prompt-config/refresh`)
- `force_refresh()` clears cache immediately
- Fetches new prompt from Ruuter
- Returns success status with prompt preview
- Returns success status with prompt length and content hash (no preview for security)

### **User Request Processing**
1. **Request Received** (Any of 3 endpoints)
Expand All @@ -192,7 +192,7 @@ The custom prompt configuration system allows admins to configure prompts via UI
5. **Question Modification**
```python
# In forward() or stream_response()
modified_question = f"{custom_instructions_prefix}{user_question}"
modified_question = f"{user_question}{custom_instructions_prefix}"
```

6. **LLM Processing**
Expand Down Expand Up @@ -275,6 +275,17 @@ WHERE id = 1;
curl -X POST http://localhost:8100/prompt-config/refresh
```

**Response:**
```json
{
"refreshed": true,
"message": "Prompt configuration refreshed successfully",
"prompt_length": 245,
"content_hash": "a3f5b8c9e1d2f4a6"
}
```
**Note:** For security, the endpoint returns only the prompt length and a SHA-256 hash (not the actual prompt content).

---

## Key Features
Expand Down
58 changes: 34 additions & 24 deletions src/llm_orchestration_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,19 +1055,14 @@ def _execute_orchestration_pipeline(
timing_dict["response_generation"] = time.time() - start_time

# Step 5: Output Guardrails Check
# Only apply guardrails to OrchestrationResponse (production/testing),
# TestOrchestrationResponse doesn't require chatId-based guardrail handling
# Apply guardrails to all response types for consistent safety across all environments
start_time = time.time()
if isinstance(generated_response, OrchestrationResponse):
output_guardrails_response = self.handle_output_guardrails(
components["guardrails_adapter"],
generated_response,
request,
costs_dict,
)
else:
# TestOrchestrationResponse - skip output guardrails
output_guardrails_response = generated_response
output_guardrails_response = self.handle_output_guardrails(
components["guardrails_adapter"],
generated_response,
request,
costs_dict,
)
timing_dict["output_guardrails_check"] = time.time() - start_time

# Step 6: Store inference data (for production and testing environments)
Expand Down Expand Up @@ -1263,16 +1258,21 @@ async def _safe_retrieve_contextual_chunks(
def handle_output_guardrails(
self,
guardrails_adapter: Optional[NeMoRailsAdapter],
generated_response: OrchestrationResponse,
generated_response: Union[OrchestrationResponse, TestOrchestrationResponse],
request: OrchestrationRequest,
costs_dict: Dict[str, Dict[str, Any]],
) -> OrchestrationResponse:
"""Check output guardrails and handle blocked responses."""
if (
) -> Union[OrchestrationResponse, TestOrchestrationResponse]:
"""Check output guardrails and handle blocked responses for both response types."""
# Determine if we should run guardrails (same logic for both response types)
should_check_guardrails = (
guardrails_adapter is not None
and generated_response.llmServiceActive
and not generated_response.questionOutOfLLMScope
):
)

if should_check_guardrails:
# Type assertion: should_check_guardrails guarantees guardrails_adapter is not None
assert guardrails_adapter is not None
output_check_result = self._check_output_guardrails(
guardrails_adapter=guardrails_adapter,
assistant_message=generated_response.content,
Expand All @@ -1289,13 +1289,23 @@ def handle_output_guardrails(
OUTPUT_GUARDRAIL_VIOLATION_MESSAGES, detected_lang
)

return OrchestrationResponse(
chatId=request.chatId,
llmServiceActive=True,
questionOutOfLLMScope=False,
inputGuardFailed=False,
content=localized_msg,
)
# Return appropriate response type based on original response type
if isinstance(generated_response, TestOrchestrationResponse):
return TestOrchestrationResponse(
llmServiceActive=True,
questionOutOfLLMScope=False,
inputGuardFailed=False,
content=localized_msg,
chunks=None,
)
else:
return OrchestrationResponse(
chatId=request.chatId,
llmServiceActive=True,
questionOutOfLLMScope=False,
inputGuardFailed=False,
content=localized_msg,
)

logger.info("Output guardrails check passed")
else:
Expand Down
88 changes: 88 additions & 0 deletions src/llm_orchestration_service_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from src.utils.stream_timeout import stream_timeout
from src.utils.error_utils import generate_error_id, log_error_with_context
from src.utils.rate_limiter import RateLimiter
from src.utils.prompt_config_loader import RefreshStatus
from models.request_models import (
OrchestrationRequest,
OrchestrationResponse,
Expand Down Expand Up @@ -751,6 +752,93 @@ def refresh_prompt_config(http_request: Request) -> Dict[str, Any]:
},
)

try:
# Use new method that returns detailed status
refresh_result = (
orchestration_service.prompt_config_loader.force_refresh_with_status()
)
refresh_status = refresh_result.get("status")

if refresh_status == RefreshStatus.SUCCESS:
# Success - configuration loaded
logger.info("Prompt configuration refreshed successfully")
return {
"refreshed": True,
"message": refresh_result.get("message"),
"prompt_length": refresh_result.get("length"),
}

elif refresh_status == RefreshStatus.NOT_FOUND:
# Configuration absent in database
error_id = generate_error_id()
logger.warning(f"[{error_id}] Prompt configuration not found in database")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={
"error": refresh_result.get("message"),
"error_id": error_id,
},
)

elif refresh_status == RefreshStatus.FETCH_FAILED:
# Upstream service failure (network/HTTP/timeout errors)
error_id = generate_error_id()
had_stale = refresh_result.get("had_stale_cache", False)

if had_stale:
logger.warning(
f"[{error_id}] Upstream service unavailable, stale cache exists"
)
# Temporarily unavailable but we have fallback
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"error": "Upstream service temporarily unavailable",
"error_id": error_id,
"message": "Stale configuration available as fallback",
},
)
else:
logger.warning(
f"[{error_id}] Upstream service unavailable, no cache exists"
)
# Service gateway error or timeout
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail={
"error": refresh_result.get("message"),
"error_id": error_id,
"details": refresh_result.get("error"),
},
)

else:
# Unexpected status - should never happen but handle defensively
error_id = generate_error_id()
logger.error(f"[{error_id}] Unexpected refresh status: {refresh_status}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"error": "Unexpected error during refresh",
"error_id": error_id,
},
)

except HTTPException:
# Re-raise HTTP exceptions as-is
raise
except Exception as e:
# Unexpected errors during refresh
error_id = generate_error_id()
logger.error(f"[{error_id}] Failed to refresh prompt configuration: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"error": "Failed to refresh prompt configuration",
"error_id": error_id,
},
) from e

try:
success = orchestration_service.prompt_config_loader.force_refresh()

Expand Down
96 changes: 92 additions & 4 deletions src/utils/prompt_config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Optional, Dict, Any
import time
import threading
from enum import Enum
from loguru import logger


Expand All @@ -15,6 +16,14 @@ class PromptConfigLoadError(Exception):
pass


class RefreshStatus(Enum):
"""Status of a refresh operation."""

SUCCESS = "success" # Configuration loaded successfully
NOT_FOUND = "not_found" # Configuration absent in database
FETCH_FAILED = "fetch_failed" # Network/HTTP/upstream errors


class PromptConfigurationLoader:
"""
Loads custom prompt configurations from Ruuter endpoint.
Expand Down Expand Up @@ -226,15 +235,20 @@ def _load_from_ruuter_with_retry(self) -> Optional[str]:
# Now extract prompt from the unwrapped data
if isinstance(data, list) and len(data) > 0:
# Array format: [{"id": 1, "prompt": "..."}]
logger.info(f"Extracting from list, first element: {data[0]}")
first_elem_keys = (
list(data[0].keys()) if isinstance(data[0], dict) else []
)
logger.info(
f"Extracting from list, first element keys: {first_elem_keys}"
)
prompt = data[0].get("prompt", "").strip()
elif isinstance(data, dict):
# Dict format: {"id": 1, "prompt": "..."}
logger.info(f"Extracting from dict, keys: {list(data.keys())}")
prompt = data.get("prompt", "").strip()
else:
logger.warning(
f"Unexpected data type: {type(data)}, value: {data}"
f"Unexpected data type: {type(data).__name__}, structure not recognized"
)

logger.info(
Expand Down Expand Up @@ -294,15 +308,89 @@ def force_refresh(self) -> bool:
Returns:
bool: True if fresh data was successfully loaded, False otherwise
"""
status_dict = self.force_refresh_with_status()
return status_dict["status"] == RefreshStatus.SUCCESS

def force_refresh_with_status(self) -> Dict[str, Any]:
"""
Force immediate cache refresh and return detailed status.

Returns:
Dict with keys:
- status: RefreshStatus enum value
- message: Human-readable message
- error: Error message (if status != SUCCESS)
"""
logger.info("Forcing prompt configuration cache refresh")

# Track state before refresh
had_cached_value = self._cached_prompt is not None

with self._cache_condition:
# Invalidate both timestamp and cached value so that a failed refresh
# cannot fall back to a stale prompt and be misreported as success.
self._cache_timestamp = None
self._cached_prompt = None

result = self.get_custom_instructions()
return bool(result)
# Attempt fresh load
prompt_text = None
fetch_error = None
try:
prompt_text = self._load_from_ruuter_with_retry()

except PromptConfigLoadError as e:
fetch_error = e
logger.error(f"Failed to fetch prompt configuration after retries: {e}")

except Exception as e:
fetch_error = e
logger.error(f"Unexpected error loading prompt configuration: {e}")

# Determine status and update cache
with self._cache_condition:
if prompt_text:
# Success - update cache
self._cached_prompt = prompt_text
self._cache_timestamp = time.time()
self._last_error = None
logger.info(
f"Prompt configuration refreshed successfully ({len(prompt_text)} chars)"
)
return {
"status": RefreshStatus.SUCCESS,
"message": "Prompt configuration refreshed successfully",
"length": len(prompt_text),
}

elif prompt_text is None and fetch_error is None:
# No configuration found (explicit empty response from Ruuter)
self._cached_prompt = ""
self._cache_timestamp = time.time()
self._last_error = None
logger.warning("No prompt configuration found in database")
return {
"status": RefreshStatus.NOT_FOUND,
"message": "No prompt configuration found in database",
"error": None,
}

else:
# Fetch failed (network/HTTP/timeout errors)
self._load_failures += 1
self._last_error = str(fetch_error)
logger.error(f"Failed to fetch prompt configuration: {fetch_error}")

# Do NOT cache empty result on failure - let next call retry
# Only keep stale cache if it existed before
if had_cached_value:
logger.warning("Keeping stale cache due to fetch failure")

return {
"status": RefreshStatus.FETCH_FAILED,
"message": "Failed to refresh configuration due to upstream service error",
"error": str(fetch_error),
"had_stale_cache": had_cached_value,
}

def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics for monitoring."""
Expand Down
Loading