diff --git a/netra/__init__.py b/netra/__init__.py index cb544fb..8a9ef20 100644 --- a/netra/__init__.py +++ b/netra/__init__.py @@ -291,6 +291,7 @@ def shutdown(cls) -> None: meter_provider.shutdown() except Exception: pass + # Close simulation HTTP client if hasattr(cls, "simulation") and cls.simulation is not None: try: @@ -298,6 +299,13 @@ def shutdown(cls) -> None: except Exception: pass + # Close evaluation HTTP client + if hasattr(cls, "evaluation") and cls.evaluation is not None: + try: + cls.evaluation.close() + except Exception: + pass + @classmethod def get_meter(cls, name: str = "netra", version: Optional[str] = None) -> otel_metrics.Meter: """ diff --git a/netra/evaluation/__init__.py b/netra/evaluation/__init__.py index 2f2959b..7282821 100644 --- a/netra/evaluation/__init__.py +++ b/netra/evaluation/__init__.py @@ -1,16 +1,20 @@ from netra.evaluation.api import Evaluation from netra.evaluation.evaluator import BaseEvaluator from netra.evaluation.models import ( + Dataset, DatasetItem, EvaluatorConfig, EvaluatorContext, EvaluatorOutput, LocalDataset, ScoreType, + TurnType, ) __all__ = [ "Evaluation", + "Dataset", + "TurnType", "DatasetItem", "BaseEvaluator", "EvaluatorContext", diff --git a/netra/evaluation/api.py b/netra/evaluation/api.py index edb9788..ca680cb 100644 --- a/netra/evaluation/api.py +++ b/netra/evaluation/api.py @@ -1,10 +1,11 @@ import asyncio import concurrent.futures import logging -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Optional from netra.config import Config from netra.evaluation.client import EvaluationHttpClient +from netra.evaluation.constants import DEFAULT_CONCURRENCY, LOG_PREFIX, MIN_CONCURRENCY, SPAN_NAME_PREFIX from netra.evaluation.models import ( AddDatasetItemResponse, CreateDatasetResponse, @@ -35,11 +36,17 @@ class Evaluation: - """Public entry-point exposed as Netra.evaluation""" + """Public entry-point exposed as Netra.evaluation. + + Attributes: + _config: The Netra configuration object. + _client: The HTTP client for evaluation API calls. + """ + + __slots__ = ("_config", "_client") def __init__(self, config: Config) -> None: - """ - Initialize the evaluation client. + """Initialize the evaluation client. Args: config: The configuration object. @@ -47,20 +54,28 @@ def __init__(self, config: Config) -> None: self._config = config self._client = EvaluationHttpClient(config) - def create_dataset(self, name: str, tags: Optional[List[str]] = None, turn_type: TurnType = TurnType.SINGLE) -> Any: - """ - Create an empty dataset and return its id on success, else None. + def close(self) -> None: + """Release resources held by the evaluation client.""" + self._client.close() + + def create_dataset( + self, + name: str, + tags: Optional[list[str]] = None, + turn_type: TurnType = TurnType.SINGLE, + ) -> Optional[CreateDatasetResponse]: + """Create an empty dataset and return its info on success, else None. Args: name: The name of the dataset. tags: Optional list of tags to associate with the dataset. - turn_type: The turn type of the dataset, either "single" or "multi". Defaults to "single". + turn_type: The turn type of the dataset. Defaults to "single". Returns: - A backend JSON response containing dataset info (id, name, tags, etc.) on success, + A CreateDatasetResponse on success, or None on failure. """ if not name: - logger.error("netra.evaluation: Failed to create dataset: dataset name is required") + logger.error("%s: Failed to create dataset: dataset name is required", LOG_PREFIX) return None response = self._client.create_dataset(name=name, tags=tags, turn_type=turn_type) @@ -84,23 +99,24 @@ def add_dataset_item( self, dataset_id: str, item: DatasetItem, - ) -> Any: - """ - Add a single item to an existing dataset + ) -> Optional[AddDatasetItemResponse]: + """Add a single item to an existing dataset. Args: dataset_id: The id of the dataset to which the item will be added. item: The dataset item to add. Returns: - A backend JSON response containing dataset item info (id, input, expected_output, etc.) on success + An AddDatasetItemResponse on success, or None on failure. """ - if not item.input: - logger.error("netra.evaluation: Skipping dataset item without required 'input'") + logger.error("%s: Skipping dataset item without required 'input'", LOG_PREFIX) return None response = self._client.add_dataset_item(dataset_id=dataset_id, item=item) + if not response: + return None + return AddDatasetItemResponse( dataset_id=response.get("datasetId", ""), project_id=response.get("projectId", ""), @@ -120,30 +136,29 @@ def add_dataset_item( deleted_at=response.get("deletedAt", None), ) - def get_dataset(self, dataset_id: str) -> Any: - """ - Get a dataset by ID. + def get_dataset(self, dataset_id: str) -> Optional[GetDatasetItemsResponse]: + """Get a dataset by ID. Args: dataset_id: The id of the dataset to retrieve. Returns: - A backend JSON response containing dataset info (id, input, expected_output etc.) on success, + A GetDatasetItemsResponse on success, or None on failure. """ if not dataset_id: - logger.error("netra.evaluation: Failed to get dataset: dataset id is required") + logger.error("%s: Failed to get dataset: dataset id is required", LOG_PREFIX) return None response = self._client.get_dataset(dataset_id) if not response: return None - dataset_items: List[DatasetItem] = [] - for item in response: - item_id = item.get("id") - item_input = item.get("input") - item_dataset_id = item.get("datasetId") - item_expected_output = item.get("expectedOutput", "") + dataset_items: list[DatasetRecord] = [] + for raw_item in response: + item_id = raw_item.get("id") + item_input = raw_item.get("input") + item_dataset_id = raw_item.get("datasetId") + item_expected_output = raw_item.get("expectedOutput", "") if item_id is None or item_dataset_id is None or item_input is None: - logger.warning("netra.evaluation: Skipping dataset item with missing required fields: %s", item) + logger.warning("%s: Skipping dataset item with missing required fields: %s", LOG_PREFIX, raw_item) continue try: dataset_items.append( @@ -155,17 +170,16 @@ def get_dataset(self, dataset_id: str) -> Any: ) ) except Exception as exc: - logger.error("netra.evaluation: Failed to parse dataset item: %s", exc) + logger.error("%s: Failed to parse dataset item: %s", LOG_PREFIX, exc) return GetDatasetItemsResponse(items=dataset_items) def create_run( self, name: str, dataset_id: Optional[str] = None, - evaluators_config: Optional[List[EvaluatorConfig]] = None, - ) -> Any: - """ - Create a new run for the given dataset and evaluators + evaluators_config: Optional[list[EvaluatorConfig]] = None, + ) -> Optional[str]: + """Create a new run for the given dataset and evaluators. Args: name: The name of the run. @@ -173,46 +187,44 @@ def create_run( evaluators_config: Optional list of evaluators to be used for the run. Returns: - run_id: The id of the created run. + The id of the created run, or None on failure. """ - if not name: - logger.error("netra.evaluation: Failed to create run: run name is required") + logger.error("%s: Failed to create run: run name is required", LOG_PREFIX) return None - evaluators_config_dicts: Optional[List[Dict[str, Any]]] = None + evaluators_config_dicts: Optional[list[dict[str, Any]]] = None if evaluators_config: evaluators_config_dicts = [e.model_dump(by_alias=True) for e in evaluators_config] response = self._client.create_run(name=name, dataset_id=dataset_id, evaluators_config=evaluators_config_dicts) - run_id = response.get("id", None) - return run_id + if not response: + return None + run_id = response.get("id") + return str(run_id) if run_id is not None else None - def get_run_results(self, run_id: str) -> Any: - """ - Fetch test run results based on run ID. + def get_run_results(self, run_id: str) -> Optional[dict[str, Any]]: + """Fetch test run results based on run ID. Args: run_id: The id of the run to fetch. Returns: - The JSON response containing run results. + The JSON response containing run results, or None on failure. """ if not run_id: - logger.error("netra.evaluation: Failed to get run: run_id is required") + logger.error("%s: Failed to get run: run_id is required", LOG_PREFIX) return None - response = self._client.get_run_results(run_id) - return response + return self._client.get_run_results(run_id) def run_test_suite( self, name: str, data: Dataset, task: Callable[[Any], Any], - evaluators: Optional[List[Any]] = None, - max_concurrency: int = 50, - ) -> Optional[Dict[str, Any]]: - """ - Netra evaluation function to initiate a test suite. + evaluators: Optional[list[Any]] = None, + max_concurrency: int = DEFAULT_CONCURRENCY, + ) -> Optional[dict[str, Any]]: + """Run a complete evaluation test suite. Args: name: The name of the run. @@ -225,6 +237,7 @@ def run_test_suite( A dictionary containing the run id and the results of the test suite. """ validate_run_inputs(name, data, task) + max_concurrency = max(MIN_CONCURRENCY, max_concurrency) items = list(data.items) dataset_id = extract_dataset_id(items) @@ -236,18 +249,15 @@ def run_test_suite( evaluators_config=evaluators_config, ) if not run_id: - logger.error("netra.evaluation: Failed to create run") + logger.error("%s: Failed to create run", LOG_PREFIX) return None - logger.info("netra.evaluation: Initiated test run") + logger.info("%s: Initiated test run", LOG_PREFIX) try: result = run_async_safely( self._run_test_suite_async(name, data, task, evaluators, max_concurrency, run_id=run_id) ) return result - except Exception: - self._client.post_run_status(run_id, "failed") - raise except BaseException: self._client.post_run_status(run_id, "failed") raise @@ -257,12 +267,11 @@ async def _run_test_suite_async( name: str, data: Dataset, task: Callable[[Any], Any], - evaluators: Optional[List[Any]], + evaluators: Optional[list[Any]], max_concurrency: int, run_id: Optional[str] = None, - ) -> Optional[Dict[str, Any]]: - """ - Async implementation of run_test_suite. + ) -> Optional[dict[str, Any]]: + """Async implementation of run_test_suite. Args: name: The name of the run. @@ -277,17 +286,15 @@ async def _run_test_suite_async( """ items = list(data.items) total_items = len(items) - max_workers = max(5, max_concurrency) - results: List[Dict[str, Any]] = [] - bg_eval_tasks: List[asyncio.Task[None]] = [] + results: list[dict[str, Any]] = [] + background_evaluator_tasks: list[asyncio.Task[None]] = [] completed_count = 0 lock = asyncio.Lock() loop = asyncio.get_running_loop() async def on_item_completed(result: ItemProcessingResult) -> None: - """ - Handle completion of a single item processing. + """Handle completion of a single item processing. Args: result: The result of item processing. @@ -297,42 +304,41 @@ async def on_item_completed(result: ItemProcessingResult) -> None: results.append(result.item_entry) if result.should_run_evaluators and run_id: eval_task = asyncio.create_task(self._run_evaluators_for_item(run_id, result.ctx, evaluators or [])) - bg_eval_tasks.append(eval_task) + background_evaluator_tasks.append(eval_task) completed_count += 1 logger.info( - "netra.evaluation: %d/%d items processed (status=%s)", + "%s: %d/%d items processed (status=%s)", + LOG_PREFIX, completed_count, total_items, result.status, ) - def process_item_sync(idx: int, item: Any) -> ItemProcessingResult: - """ - Synchronous wrapper for thread pool execution. + def process_item_sync(idx: int, dataset_item: Any) -> ItemProcessingResult: + """Synchronous wrapper for thread pool execution. Args: idx: The index of the item. - item: The dataset item to process. + dataset_item: The dataset item to process. """ - return run_async_safely(self._process_single_item(idx, item, run_id, name, task, evaluators)) + return run_async_safely(self._process_single_item(idx, dataset_item, run_id, name, task, evaluators)) - async def process_item(idx: int, item: Any) -> None: - """ - Process a single item and handle its completion. + async def process_item(idx: int, dataset_item: Any) -> None: + """Process a single item and handle its completion. Args: idx: The index of the item. - item: The dataset item to process. + dataset_item: The dataset item to process. """ - result = await loop.run_in_executor(executor, process_item_sync, idx, item) + result = await loop.run_in_executor(executor, process_item_sync, idx, dataset_item) await on_item_completed(result) - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrency) as executor: await asyncio.gather(*[process_item(i, item) for i, item in enumerate(items)]) - if bg_eval_tasks: - await asyncio.gather(*bg_eval_tasks, return_exceptions=True) + if background_evaluator_tasks: + await asyncio.gather(*background_evaluator_tasks, return_exceptions=True) self._client.post_run_status(run_id, "completed") # type:ignore[arg-type] return {"runId": run_id, "items": results} @@ -344,10 +350,9 @@ async def _process_single_item( run_id: Optional[str], run_name: str, task: Callable[[Any], Any], - evaluators: Optional[List[Any]], + evaluators: Optional[list[Any]], ) -> ItemProcessingResult: - """ - Process a single dataset item through the execution pipeline. + """Process a single dataset item through the execution pipeline. Args: idx: Index of the item in the dataset. @@ -386,15 +391,14 @@ async def _process_single_item( ) def _create_item_context(self, idx: int, item: Any) -> ItemContext: - """ - Create an ItemContext from a dataset item. + """Create an ItemContext from a dataset item. Args: idx: The index of the item. item: The dataset item. Returns: - ItemContext: The created ItemContext. + The created ItemContext. """ if isinstance(item, DatasetRecord): return ItemContext( @@ -416,22 +420,21 @@ async def _execute_item_pipeline( run_name: str, ctx: ItemContext, task: Callable[[Any], Any], - ) -> Dict[str, Any]: - """ - Execute the full pipeline for a single item. + ) -> dict[str, Any]: + """Execute the full pipeline for a single item. Args: run_id: The run ID. run_name: The name of the run. ctx: The item context. task: The task function to execute. - evaluators: Optional list of evaluators. - results: List to append results to. - bg_eval_tasks: List to append background evaluation tasks to. + + Returns: + A dict containing the item processing status. """ - span_name = f"TestRun.{run_name}" + span_name = f"{SPAN_NAME_PREFIX}.{run_name}" - with SpanWrapper(span_name, module_name="netra.evaluation") as span: + with SpanWrapper(span_name, module_name=LOG_PREFIX) as span: otel_span = span.get_current_span() if otel_span: span_context = otel_span.get_span_context() @@ -446,55 +449,43 @@ async def _execute_item_pipeline( "status": ctx.status, } - def _post_triggered_status(self, run_id: str, ctx: ItemContext) -> str: - """ - Post agent_triggered status and return test_run_item_id. + def _post_completed_status(self, run_id: str, ctx: ItemContext) -> Optional[str]: + """Post completed/failed status with task output. Args: run_id: The run ID. ctx: The item context. Returns: - str: The test_run_item_id. - """ - payload = build_item_payload(ctx, status="agent_triggered") - response = self._client.post_run_item(run_id, payload) - - if isinstance(response, dict): - item_id = response.get("id") or response.get("testRunItemId") - if item_id: - return str(item_id) - return f"local-{ctx.index}" - - def _post_completed_status(self, run_id: str, ctx: ItemContext) -> Any: - """ - Post completed/failed status with task output. - - Args: - run_id: The run ID. - ctx: The item context. + The run item id from the backend, or None. """ payload = build_item_payload(ctx, status=ctx.status, include_output=True) - run_item_id = self._client.post_run_item(run_id, payload) - return run_item_id + return self._client.post_run_item(run_id, payload) async def _run_evaluators_for_item( self, run_id: str, ctx: ItemContext, - evaluators: List[Any], + evaluators: list[Any], ) -> None: - """ - Run all evaluators for a single item after span ingestion. + """Run all evaluators for a single item after span ingestion. Args: run_id: The run ID. ctx: The item context. evaluators: List of evaluators. """ - await self._client.wait_for_span_ingestion(ctx.span_id) + ingestion_ok = await self._client.wait_for_span_ingestion(ctx.span_id) + if not ingestion_ok: + logger.warning( + "%s: Span ingestion timed out for item %d (span_id=%s), skipping evaluator submission", + LOG_PREFIX, + ctx.index, + ctx.span_id, + ) + return - evaluator_results: List[Dict[str, Any]] = [] + evaluator_results: list[dict[str, Any]] = [] for evaluator in evaluators: try: result = await run_single_evaluator( @@ -506,7 +497,14 @@ async def _run_evaluators_for_item( ) if result: evaluator_results.append(result) - except Exception: + except Exception as exc: + logger.error( + "%s: Evaluator '%s' failed for item %d: %s", + LOG_PREFIX, + getattr(getattr(evaluator, "config", None), "name", "unknown"), + ctx.index, + exc, + ) continue if evaluator_results and ctx.test_run_item_id: diff --git a/netra/evaluation/client.py b/netra/evaluation/client.py index b3881c1..23e5c42 100644 --- a/netra/evaluation/client.py +++ b/netra/evaluation/client.py @@ -1,34 +1,71 @@ import asyncio import logging -import os import time -from typing import Any, Dict, List, Optional +from typing import Any, Optional import httpx from netra.config import Config +from netra.evaluation.constants import ( + DEFAULT_TIMEOUT, + ENV_TIMEOUT, + LOG_PREFIX, + TELEMETRY_SUFFIX, + URL_CREATE_DATASET, + URL_CREATE_RUN, + URL_DATASET_ITEMS, + URL_GET_DATASET, + URL_GET_RUN, + URL_LOCAL_EVALUATIONS, + URL_RUN_ITEM, + URL_RUN_STATUS, + URL_SPAN, +) from netra.evaluation.models import DatasetItem, TurnType +from netra.evaluation.utils import parse_env_float logger = logging.getLogger(__name__) class EvaluationHttpClient: + """Internal HTTP client for Evaluation APIs. + + Attributes: + _client: The underlying httpx client instance. """ - Internal HTTP client for Evaluation APIs. - """ + + __slots__ = ("_client",) def __init__(self, config: Config) -> None: - """ - Initialize HTTP client for evaluation endpoints. + """Initialize HTTP client for evaluation endpoints. Args: config: The configuration object. """ self._client: Optional[httpx.Client] = self._create_client(config) - def _create_client(self, config: Config) -> Optional[httpx.Client]: + def close(self) -> None: + """Close the underlying HTTP client and release connection resources.""" + if self._client: + try: + self._client.close() + except Exception: + logger.debug("%s: Error closing HTTP client", LOG_PREFIX, exc_info=True) + finally: + self._client = None + + def _ensure_client(self) -> Optional[httpx.Client]: + """Return the underlying client, logging an error if it is not initialized. + + Returns: + The httpx client, or None if not available. """ - Create an HTTP client for evaluation endpoints. + if not self._client: + logger.error("%s: Client not initialized", LOG_PREFIX) + return self._client + + def _create_client(self, config: Config) -> Optional[httpx.Client]: + """Create an HTTP client for evaluation endpoints. Args: config: The configuration object. @@ -38,22 +75,21 @@ def _create_client(self, config: Config) -> Optional[httpx.Client]: """ endpoint = (config.otlp_endpoint or "").strip() if not endpoint: - logger.error("netra.evaluation: NETRA_OTLP_ENDPOINT is required for evaluation APIs") + logger.error("%s: NETRA_OTLP_ENDPOINT is required for evaluation APIs", LOG_PREFIX) return None base_url = self._resolve_base_url(endpoint) headers = self._build_headers(config) - timeout = self._get_timeout() + timeout = parse_env_float(ENV_TIMEOUT, DEFAULT_TIMEOUT) try: return httpx.Client(base_url=base_url, headers=headers, timeout=timeout) except Exception as exc: - logger.error("netra.evaluation: Failed to initialize evaluation HTTP client: %s", exc) + logger.error("%s: Failed to initialize evaluation HTTP client: %s", LOG_PREFIX, exc) return None def _resolve_base_url(self, endpoint: str) -> str: - """ - Resolve base URL from endpoint. + """Extract base URL, removing telemetry suffix if present. Args: endpoint: The endpoint to resolve. @@ -62,154 +98,195 @@ def _resolve_base_url(self, endpoint: str) -> str: The resolved base URL. """ base_url = endpoint.rstrip("/") - if base_url.endswith("/telemetry"): - base_url = base_url[: -len("/telemetry")] + if base_url.endswith(TELEMETRY_SUFFIX): + base_url = base_url[: -len(TELEMETRY_SUFFIX)] return base_url - def _build_headers(self, config: Config) -> Dict[str, str]: - """ - Build Headers for Evaluation Client + def _build_headers(self, config: Config) -> dict[str, str]: + """Build request headers from configuration. Args: config: The configuration object. Returns: - The headers for evaluation client. + Dictionary of HTTP headers. """ - headers: Dict[str, str] = dict(config.headers or {}) - api_key = config.api_key - if api_key: - headers["x-api-key"] = api_key + headers: dict[str, str] = dict(config.headers or {}) + if config.api_key: + headers["x-api-key"] = config.api_key return headers - def _get_timeout(self) -> float: - """ - Get timeout for evaluation client. + def _extract_error_message( + self, + response: Optional[httpx.Response], + exc: Exception, + ) -> str: + """Extract error message from response or exception. - Returns: - The timeout for evaluation client. - """ - timeout_env = os.getenv("NETRA_EVALUATION_TIMEOUT") - if not timeout_env: - return 10.0 - try: - return float(timeout_env) - except ValueError: - logger.warning( - "netra.evaluation: Invalid NETRA_EVALUATION_TIMEOUT value '%s', using default 10.0", - timeout_env, - ) - return 10.0 + Args: + response: The HTTP response object, if available. + exc: The exception that was raised. - def create_dataset( - self, name: Optional[str], tags: Optional[List[str]] = None, turn_type: TurnType = TurnType.SINGLE - ) -> Any: + Returns: + A descriptive error message string. """ - Create an empty dataset + if response is not None: + try: + response_json = response.json() + error_data = response_json.get("error", {}) + if isinstance(error_data, dict): + msg = error_data.get("message") + if isinstance(msg, str): + return msg + except Exception: + logger.debug("%s: Could not parse error from response body", LOG_PREFIX, exc_info=True) + return str(exc) + + def _post_data( + self, + url: str, + payload: dict[str, Any], + error_context: str, + ) -> Optional[dict[str, Any]]: + """Send a POST request and return the unwrapped ``data`` envelope. Args: - name: The name of the dataset. - tags: Optional list of tags to associate with the dataset. - turn_type: The turn type of the dataset, either "single" or "multi". Defaults to "single". + url: The endpoint URL. + payload: The JSON payload to send. + error_context: Description used in the error log message. Returns: - A backend JSON response containing dataset info (id, name, tags, etc.) on success, - or None if creation fails. + The ``data`` dict from the response envelope, or None on failure. """ - if not self._client: - logger.error("netra.evaluation: Evaluation client is not initialized; cannot create dataset") + client = self._ensure_client() + if not client: return None + + response: Optional[httpx.Response] = None try: - url = "/evaluations/dataset" - payload: Dict[str, Any] = {"name": name, "tags": tags if tags else [], "turnType": turn_type.value} - response = self._client.post(url, json=payload) + response = client.post(url, json=payload) response.raise_for_status() - data = response.json() - if isinstance(data, dict) and "data" in data: - logger.info("netra.evaluation: Dataset created successfully") - return data.get("data", {}) - except Exception: - response_json = response.json() - logger.error( - "netra.evaluation: Failed to create dataset: %s", response_json.get("error").get("message", "") - ) + body = response.json() + if isinstance(body, dict) and "data" in body: + result = body["data"] + return result if isinstance(result, dict) else None + return None + except Exception as exc: + error_msg = self._extract_error_message(response, exc) + logger.error("%s: %s: %s", LOG_PREFIX, error_context, error_msg) return None - def add_dataset_item(self, dataset_id: str, item: DatasetItem) -> Any: - """ - Add a single item to an existing dataset and return backend data (e.g., new item id). + def _get_data( + self, + url: str, + error_context: str, + ) -> Optional[Any]: + """Send a GET request and return the unwrapped ``data`` envelope. Args: - dataset_id: The id of the dataset to which the item will be added. - item_payload: The dataset item to add. + url: The endpoint URL. + error_context: Description used in the error log message. Returns: - A backend JSON response on success or {"success": False} on error. + The ``data`` value from the response envelope, or None on failure. """ - if not self._client: - logger.error("netra.evaluation: Evaluation client is not initialized; cannot add item to dataset") - return {"success": False} + client = self._ensure_client() + if not client: + return None + + response: Optional[httpx.Response] = None try: - url = f"/evaluations/dataset/{dataset_id}/items" - item_payload: Dict[str, Any] = { - "input": item.input if item.input else None, - "expectedOutput": item.expected_output if item.expected_output else None, - "tags": item.tags if item.tags else None, - "metadata": item.metadata if item.metadata else None, - } - response = self._client.post(url, json=item_payload) + response = client.get(url) response.raise_for_status() - data = response.json() - if isinstance(data, dict) and "data" in data: - logger.info("netra.evaluation: Dataset item added successfully") - return data.get("data", {}) - except Exception: - response_json = response.json() - logger.error( - "netra.evaluation: Failed to add item to dataset '%s': %s", - dataset_id, - response_json.get("error").get("message", ""), - ) + body = response.json() + if isinstance(body, dict) and "data" in body: + return body["data"] return None + except Exception as exc: + error_msg = self._extract_error_message(response, exc) + logger.error("%s: %s: %s", LOG_PREFIX, error_context, error_msg) + return None + + def create_dataset( + self, + name: Optional[str], + tags: Optional[list[str]] = None, + turn_type: TurnType = TurnType.SINGLE, + ) -> Optional[dict[str, Any]]: + """Create an empty dataset. + + Args: + name: The name of the dataset. + tags: Optional list of tags to associate with the dataset. + turn_type: The turn type of the dataset. Defaults to "single". + + Returns: + A backend JSON response containing dataset info on success, or None on failure. + """ + payload: dict[str, Any] = {"name": name, "tags": tags if tags else [], "turnType": turn_type.value} + result = self._post_data(URL_CREATE_DATASET, payload, "Failed to create dataset") + if result is not None: + logger.info("%s: Dataset created successfully", LOG_PREFIX) + return result - def get_dataset(self, dataset_id: str) -> Any: + def add_dataset_item(self, dataset_id: str, item: DatasetItem) -> Optional[dict[str, Any]]: + """Add a single item to an existing dataset. + + Args: + dataset_id: The id of the dataset to which the item will be added. + item: The dataset item to add. + + Returns: + A backend JSON response on success, or None on failure. """ - Fetch dataset items for a dataset id. + url = URL_DATASET_ITEMS.format(dataset_id=dataset_id) + item_payload: dict[str, Any] = { + "input": item.input if item.input else None, + "expectedOutput": item.expected_output if item.expected_output else None, + "tags": item.tags if item.tags else None, + "metadata": item.metadata if item.metadata else None, + } + result = self._post_data(url, item_payload, f"Failed to add item to dataset '{dataset_id}'") + if result is not None: + logger.info("%s: Dataset item added successfully", LOG_PREFIX) + return result + + def get_dataset(self, dataset_id: str) -> Optional[list[dict[str, Any]]]: + """Fetch dataset items for a dataset id. Args: dataset_id: The id of the dataset to fetch. Returns: - A list of dataset items. + A list of dataset item dicts, or None on failure. """ - if not self._client: - logger.error("netra.evaluation: Evaluation client is not initialized; cannot fetch dataset") - return {"success": False} + client = self._ensure_client() + if not client: + return None + + response: Optional[httpx.Response] = None try: - url = f"/evaluations/dataset/{dataset_id}" - response = self._client.get(url) + url = URL_GET_DATASET.format(dataset_id=dataset_id) + response = client.get(url) response.raise_for_status() - data = response.json() - if isinstance(data, dict) and "data" in data: - logger.info("netra.evaluation: Dataset fetched successfully") - return data.get("data", []) - except Exception: - response_json = response.json() - logger.error( - "netra.evaluation: Failed to fetch dataset '%s': %s", - dataset_id, - response_json.get("error").get("message", ""), - ) + body = response.json() + if isinstance(body, dict) and "data" in body: + logger.info("%s: Dataset fetched successfully", LOG_PREFIX) + data = body["data"] + return data if isinstance(data, list) else None + return None + except Exception as exc: + error_msg = self._extract_error_message(response, exc) + logger.error("%s: Failed to fetch dataset '%s': %s", LOG_PREFIX, dataset_id, error_msg) return None def create_run( self, name: str, dataset_id: Optional[str] = None, - evaluators_config: Optional[List[Dict[str, Any]]] = None, - ) -> Any: - """ - Create a new run based on the provided name, dataset_id, and evaluators_config. + evaluators_config: Optional[list[dict[str, Any]]] = None, + ) -> Optional[dict[str, Any]]: + """Create a new run based on the provided name, dataset_id, and evaluators_config. Args: name: The name of the run. @@ -217,166 +294,92 @@ def create_run( evaluators_config: Optional list of evaluators to be used for the run. Returns: - A backend JSON response containing run_id + A backend JSON response containing run info, or None on failure. """ - if not self._client: - logger.error("netra.evaluation: Evaluation client is not initialized; cannot create run") - return {"success": False} - try: - url = f"/evaluations/test_run" - payload: Dict[str, Any] = { - "name": name, - "datasetId": dataset_id if dataset_id else None, - "localEvaluators": evaluators_config if evaluators_config else [], - } - response = self._client.post(url, json=payload) - response.raise_for_status() - data = response.json() - if isinstance(data, dict) and "data" in data: - return data.get("data", {}) - except Exception: - response_json = response.json() - logger.error( - "netra.evaluation: Failed to create run '%s': %s", name, response_json.get("error").get("message", "") - ) - return {"success": False} + payload: dict[str, Any] = { + "name": name, + "datasetId": dataset_id if dataset_id else None, + "localEvaluators": evaluators_config if evaluators_config else [], + } + return self._post_data(URL_CREATE_RUN, payload, f"Failed to create run '{name}'") - def post_run_item(self, run_id: str, payload: Dict[str, Any]) -> Any: - """ - Submit a new run item to the backend. + def post_run_item(self, run_id: str, payload: dict[str, Any]) -> Optional[str]: + """Submit a new run item to the backend. Args: run_id: The id of the run to which the item will be added. payload: The run item to add. Returns: - A backend JSON response on success or {"success": False} on error. + The run item id on success, or None on failure. """ - if not self._client: - logger.error("netra.evaluation: Evaluation client is not initialized; cannot post run item") - return {"success": False} - try: - url = f"/evaluations/run/{run_id}/item" - response = self._client.post(url, json=payload) - response.raise_for_status() - data = response.json() - if isinstance(data, dict) and "data" in data: - run_item = data.get("data", {}).get("item") - run_item_id = run_item.get("id") - return run_item_id - return data - except Exception: - response_json = response.json() - logger.error( - "netra.evaluation: Failed to post run item for run '%s': %s", - run_id, - response_json.get("error").get("message", ""), - ) - return {"success": False} + url = URL_RUN_ITEM.format(run_id=run_id) + result = self._post_data(url, payload, f"Failed to post run item for run '{run_id}'") + if result is not None: + run_item = result.get("item", {}) + if isinstance(run_item, dict): + item_id = run_item.get("id") + return str(item_id) if item_id is not None else None + return None def submit_local_evaluations( - self, run_id: str, test_run_item_id: str, evaluator_results: List[Dict[str, Any]] - ) -> Any: - """ - Submit local evaluations result + self, + run_id: str, + test_run_item_id: str, + evaluator_results: list[dict[str, Any]], + ) -> Optional[dict[str, Any]]: + """Submit local evaluations result. Args: - run_id: The id of the run to which the item will be added. + run_id: The id of the run. test_run_item_id: The id of the test run item. evaluator_results: The evaluator results to submit. Returns: - A backend JSON response containing confirmation of the submission. + A backend JSON response containing confirmation, or None on failure. """ - if not self._client: - logger.error("netra.evaluation: Evaluation client is not initialized; cannot submit local evaluations") - return {"success": False} - try: - url = f"/evaluations/run/{run_id}/item/{test_run_item_id}/local-evaluations" - payload: Dict[str, Any] = {"evaluatorResults": evaluator_results} - response = self._client.post(url, json=payload) - response.raise_for_status() - data = response.json() - if isinstance(data, dict) and "data" in data: - return data.get("data", {}) - return data - except Exception: - response_json = response.json() - logger.error( - "netra.evaluation: Failed to submit local evaluations for run '%s', item '%s': %s", - run_id, - test_run_item_id, - response_json.get("error").get("message", ""), - ) - return {"success": False} - - def post_run_status(self, run_id: str, status: str) -> Any: - """ - Submit the run status + url = URL_LOCAL_EVALUATIONS.format(run_id=run_id, test_run_item_id=test_run_item_id) + request_payload: dict[str, Any] = {"evaluatorResults": evaluator_results} + return self._post_data( + url, + request_payload, + f"Failed to submit local evaluations for run '{run_id}', item '{test_run_item_id}'", + ) + + def post_run_status(self, run_id: str, status: str) -> Optional[dict[str, Any]]: + """Submit the run status. Args: - run_id: The id of the run to which the item will be added. - status: The status of the run. + run_id: The id of the run to update. + status: The status of the run. - Returns: - A backend JSON response containing confirmation of the submission. - """ - if not self._client: - logger.error("netra.evaluation: Evaluation client is not initialized; cannot post run status") - return {"success": False} - try: - url = f"/evaluations/run/{run_id}/status" - payload: Dict[str, Any] = {"status": status} - response = self._client.post(url, json=payload) - response.raise_for_status() - data = response.json() - if isinstance(data, dict) and "data" in data: - logger.info("netra.evaluation: Completed test run successfully") - return data.get("data", {}) - return data - except Exception: - response_json = response.json() - logger.error( - "netra.evaluation: Failed to post run status for run '%s': %s", - run_id, - response_json.get("error").get("message", ""), - ) - return {"success": False} - - def get_run_results(self, run_id: str) -> Any: + Returns: + A backend JSON response containing confirmation, or None on failure. """ - Fetch test run results by run ID. + payload: dict[str, Any] = {"status": status} + url = URL_RUN_STATUS.format(run_id=run_id) + result = self._post_data(url, payload, f"Failed to post run status for run '{run_id}'") + if result is not None: + logger.info("%s: Test run status updated to '%s'", LOG_PREFIX, status) + return result + + def get_run_results(self, run_id: str) -> Optional[dict[str, Any]]: + """Fetch test run results by run ID. Args: run_id: The id of the run to fetch. Returns: - A JSON response containing run results. + A JSON response containing run results, or None on failure. """ - if not self._client: - logger.error("netra.evaluation: Evaluation client is not initialized; cannot fetch run") - return None - try: - url = f"/evaluations/run/{run_id}" - response = self._client.get(url) - response.raise_for_status() - data = response.json() - if isinstance(data, dict) and "data" in data: - logger.info("netra.evaluation: Run fetched successfully") - return data.get("data", {}) - return data - except Exception: - response_json = response.json() - logger.error( - "netra.evaluation: Failed to fetch run results for run '%s': %s", - run_id, - response_json.get("error").get("message", ""), - ) - return None + url = URL_GET_RUN.format(run_id=run_id) + result = self._get_data(url, f"Failed to fetch run results for run '{run_id}'") + if result is not None: + logger.info("%s: Run fetched successfully", LOG_PREFIX) + return result if isinstance(result, dict) else None - def get_span_by_id(self, span_id: str) -> Any: - """ - Check if a span exists in the backend. + def get_span_by_id(self, span_id: str) -> Optional[dict[str, Any]]: + """Check if a span exists in the backend. Args: span_id: The span ID to check. @@ -384,17 +387,18 @@ def get_span_by_id(self, span_id: str) -> Any: Returns: The span data if found, None otherwise. """ - if not self._client: - logger.error("netra.evaluation: Evaluation client is not initialized; cannot get span") + if not self._ensure_client(): return None + try: - url = f"sdk/traces/spans/{span_id}" - response = self._client.get(url) + url = URL_SPAN.format(span_id=span_id) + response = self._client.get(url) # type:ignore[union-attr] response.raise_for_status() - data = response.json() - if isinstance(data, dict): - return data.get("data", data) - return data + body = response.json() + if isinstance(body, dict): + data = body.get("data", body) + return data if isinstance(data, dict) else None + return None except Exception: return None @@ -405,8 +409,7 @@ async def wait_for_span_ingestion( poll_interval_seconds: float = 1.0, initial_delay_seconds: float = 0.5, ) -> bool: - """ - Wait until a span is available in the backend. + """Wait until a span is available in the backend. Polls the GET /spans/:id endpoint to verify span availability before running evaluators. diff --git a/netra/evaluation/constants.py b/netra/evaluation/constants.py new file mode 100644 index 0000000..d22b6bd --- /dev/null +++ b/netra/evaluation/constants.py @@ -0,0 +1,37 @@ +"""Shared constants for the evaluation module.""" + +# --------------------------------------------------------------------------- +# Logging +# --------------------------------------------------------------------------- +LOG_PREFIX = "netra.evaluation" + +# --------------------------------------------------------------------------- +# Concurrency +# --------------------------------------------------------------------------- +MIN_CONCURRENCY = 1 +DEFAULT_CONCURRENCY = 5 + +# --------------------------------------------------------------------------- +# Span / tracing +# --------------------------------------------------------------------------- +SPAN_NAME_PREFIX = "TestRun" + +# --------------------------------------------------------------------------- +# API endpoints +# --------------------------------------------------------------------------- +URL_CREATE_DATASET = "/evaluations/dataset" +URL_DATASET_ITEMS = "/evaluations/dataset/{dataset_id}/items" +URL_GET_DATASET = "/evaluations/dataset/{dataset_id}" +URL_CREATE_RUN = "/evaluations/test_run" +URL_RUN_ITEM = "/evaluations/run/{run_id}/item" +URL_LOCAL_EVALUATIONS = "/evaluations/run/{run_id}/item/{test_run_item_id}/local-evaluations" +URL_RUN_STATUS = "/evaluations/run/{run_id}/status" +URL_GET_RUN = "/evaluations/run/{run_id}" +URL_SPAN = "sdk/traces/spans/{span_id}" +TELEMETRY_SUFFIX = "/telemetry" + +# --------------------------------------------------------------------------- +# HTTP client timeouts +# --------------------------------------------------------------------------- +DEFAULT_TIMEOUT = 10.0 +ENV_TIMEOUT = "NETRA_EVALUATION_TIMEOUT" diff --git a/netra/evaluation/evaluator.py b/netra/evaluation/evaluator.py index e46e58f..5a01035 100644 --- a/netra/evaluation/evaluator.py +++ b/netra/evaluation/evaluator.py @@ -31,6 +31,7 @@ def evaluate(self, context: EvaluatorContext) -> EvaluatorOutput: ) # Usage: + dataset = ... result = Netra.evaluation.run_test_suite( name="Copywriting Assistant v1", data=dataset, diff --git a/netra/evaluation/models.py b/netra/evaluation/models.py index c0ab140..f12397b 100644 --- a/netra/evaluation/models.py +++ b/netra/evaluation/models.py @@ -1,16 +1,19 @@ -import asyncio -from dataclasses import dataclass, field +"""Data models for the evaluation module.""" + +from dataclasses import dataclass from enum import Enum -from typing import Any, Dict, List, Optional +from typing import Any, Optional from pydantic import BaseModel, Field class CreateDatasetResponse(BaseModel): # type:ignore[misc] + """Response from creating a dataset.""" + project_id: str organization_id: str name: str - tags: Optional[List[str]] = [] + tags: Optional[list[str]] = Field(default_factory=list) created_by: str updated_by: str updated_at: str @@ -20,6 +23,8 @@ class CreateDatasetResponse(BaseModel): # type:ignore[misc] class AddDatasetItemResponse(BaseModel): # type:ignore[misc] + """Response from adding a dataset item.""" + dataset_id: str project_id: str organization_id: str @@ -27,18 +32,20 @@ class AddDatasetItemResponse(BaseModel): # type:ignore[misc] input: Any expected_output: Optional[Any] = None is_active: bool - tags: Optional[List[str]] = [] + tags: Optional[list[str]] = Field(default_factory=list) created_by: str updated_by: str updated_at: str source_id: Optional[str] = None - metadata: Optional[Dict[str, Any]] = None + metadata: Optional[dict[str, Any]] = None id: str created_at: str deleted_at: Optional[str] = None class DatasetRecord(BaseModel): # type:ignore[misc] + """A single record fetched from a remote dataset.""" + id: str input: Any dataset_id: str @@ -46,23 +53,31 @@ class DatasetRecord(BaseModel): # type:ignore[misc] class GetDatasetItemsResponse(BaseModel): # type:ignore[misc] - items: List[DatasetRecord] + """Response from fetching dataset items.""" + + items: list[DatasetRecord] class DatasetItem(BaseModel): # type:ignore[misc] + """A single dataset item provided by the user.""" + input: Any expected_output: Optional[Any] = None - metadata: Optional[Dict[str, Any]] = None - tags: Optional[List[str]] = None + metadata: Optional[dict[str, Any]] = None + tags: Optional[list[str]] = None class ScoreType(str, Enum): + """Supported evaluator score types.""" + BOOLEAN = "boolean" NUMERICAL = "numerical" CATEGORICAL = "categorical" class EvaluatorConfig(BaseModel): # type:ignore[misc] + """Configuration for a single evaluator.""" + name: str label: str score_type: ScoreType = Field(alias="scoreType") @@ -73,13 +88,17 @@ class EvaluatorConfig(BaseModel): # type:ignore[misc] class EvaluatorContext(BaseModel): # type:ignore[misc] + """Context passed to an evaluator's evaluate() method.""" + input: Any task_output: Any expected_output: Any = None - metadata: Optional[Dict[str, Any]] = None + metadata: Optional[dict[str, Any]] = None class EvaluatorOutput(BaseModel): # type:ignore[misc] + """Result returned from an evaluator's evaluate() method.""" + evaluator_name: str result: Any is_passed: bool @@ -87,17 +106,19 @@ class EvaluatorOutput(BaseModel): # type:ignore[misc] class Dataset(BaseModel): # type:ignore[misc] - items: List[DatasetItem] | List[DatasetRecord] + """Container for dataset items used by run_test_suite.""" + items: list[DatasetItem] | list[DatasetRecord] -@dataclass + +@dataclass(slots=True) class ItemContext: """Context for a single dataset item during test suite execution.""" index: int item_input: Any expected_output: Any = None - metadata: Optional[Dict[str, Any]] = None + metadata: Optional[dict[str, Any]] = None dataset_item_id: Optional[str] = None trace_id: str = "" span_id: str = "" @@ -107,34 +128,24 @@ class ItemContext: status: str = "pending" -@dataclass -class RunContext: - """Shared context for a test suite run.""" - - run_id: str - run_name: str - evaluators: Optional[List[Any]] = None - poller: Optional[Any] = None - results: List[Dict[str, Any]] = field(default_factory=list) - bg_eval_tasks: List[asyncio.Task[None]] = field(default_factory=list) - - class LocalDataset(BaseModel): # type:ignore[misc] """Local dataset class for running test suite locally.""" - items: List[DatasetItem] + items: list[DatasetItem] class TurnType(str, Enum): + """Turn type for a dataset.""" + SINGLE = "single" MULTI = "multi" -@dataclass +@dataclass(slots=True) class ItemProcessingResult: """Result of processing a single dataset item.""" - item_entry: Dict[str, Any] + item_entry: dict[str, Any] should_run_evaluators: bool ctx: ItemContext status: str diff --git a/netra/evaluation/utils.py b/netra/evaluation/utils.py index a40aeeb..6c7181e 100644 --- a/netra/evaluation/utils.py +++ b/netra/evaluation/utils.py @@ -1,24 +1,53 @@ +"""Utility functions for the evaluation module.""" + import asyncio import logging +import os import threading -from typing import Any, Awaitable, Callable, Dict, List, Optional, TypeVar +from typing import Any, Awaitable, Callable, Optional, TypeVar from opentelemetry import baggage from opentelemetry import context as otel_context +from netra.evaluation.constants import LOG_PREFIX from netra.evaluation.models import DatasetRecord, EvaluatorConfig, EvaluatorContext, ItemContext logger = logging.getLogger(__name__) -T = TypeVar("T") +_T = TypeVar("_T") -def get_session_id_from_baggage() -> Optional[str]: +def parse_env_float(env_var: str, default: float) -> float: + """Read an environment variable and parse it as a float. + + Args: + env_var: Name of the environment variable. + default: Value to return when the variable is unset or invalid. + + Returns: + The parsed float, or *default* on failure. """ - Get the session ID from the OpenTelemetry baggage. + raw = os.getenv(env_var) + if not raw: + return default + try: + return float(raw) + except ValueError: + logger.warning( + "%s: Invalid value '%s' for %s, using default %.1f", + LOG_PREFIX, + raw, + env_var, + default, + ) + return default + + +def get_session_id_from_baggage() -> Optional[str]: + """Get the session ID from the OpenTelemetry baggage. Returns: - session_id: The session ID if found, None otherwise. + The session ID if found, None otherwise. """ ctx = otel_context.get_current() session_id = baggage.get_baggage("session_id", ctx) @@ -28,82 +57,80 @@ def get_session_id_from_baggage() -> Optional[str]: def format_trace_id(trace_id: int) -> str: - """ - Format the trace ID as a 32-digit hexadecimal string. + """Format the trace ID as a 32-digit hexadecimal string. + + Args: + trace_id: The integer trace ID to format. - Return: - trace_id: The formatted trace ID. + Returns: + The formatted trace ID. """ return f"{trace_id:032x}" def format_span_id(span_id: int) -> str: - """ - Format the span ID as a 16-digit hexadecimal string. - - Return: - span_id: The formatted span ID. - """ - return f"{span_id:016x}" + """Format the span ID as a 16-digit hexadecimal string. + Args: + span_id: The integer span ID to format. -async def run_callable_maybe_async(fn: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: - """ - Run callable + Returns: + The formatted span ID. """ - result = fn(*args, **kwargs) - if asyncio.iscoroutine(result): - return await result - return result + return f"{span_id:016x}" -def run_async_safely(coroutine: Awaitable[T]) -> T: - """Run an async coroutine from sync code. +def run_async_safely(coro: Awaitable[_T]) -> _T: + """Run an async coroutine from synchronous code. - If there is already an event loop running in this thread, we execute in a - dedicated thread to avoid 'asyncio.run() cannot be called from a running event loop'. + When called from a context that already has a running event loop (e.g. a + Jupyter notebook, or an async framework like FastAPI), ``asyncio.run()`` + would raise. In that case we spin up a **new daemon thread** with its own + event loop so the caller's loop is never blocked or re-entered. Args: - coroutine: The coroutine to run. + coro: The coroutine to execute. Returns: - The result of the coroutine. - """ + The result of the coroutine execution. + Raises: + Exception: Re-raises any exception from the coroutine. + """ try: loop = asyncio.get_running_loop() except RuntimeError: loop = None if loop and loop.is_running(): - result_container: Dict[str, T] = {} - error_container: Dict[str, Exception] = {} + result_holder: dict[str, _T] = {} + error_holder: dict[str, BaseException] = {} - def _runner() -> None: + def runner() -> None: try: - result_container["result"] = asyncio.run(coroutine) # type: ignore[arg-type] - except Exception as exc: # pragma: no cover - error_container["error"] = exc + result_holder["value"] = asyncio.run(coro) # type: ignore[arg-type] + except BaseException as exc: + error_holder["exc"] = exc - thread = threading.Thread(target=_runner, daemon=True) + thread = threading.Thread(target=runner, daemon=True) thread.start() thread.join() - if "error" in error_container: - raise error_container["error"] - return result_container.get("result") # type: ignore[return-value] - return asyncio.run(coroutine) # type: ignore[arg-type] + if "exc" in error_holder: + raise error_holder["exc"] + return result_holder.get("value") # type: ignore[return-value] + + return asyncio.run(coro) # type: ignore[arg-type] def extract_evaluator_config(evaluator: Any) -> Optional[EvaluatorConfig]: - """ - Extract evaluator configuration from an evaluator object. + """Extract evaluator configuration from an evaluator object. Args: evaluator: The evaluator object. Returns: - Optional[EvaluatorConfig]: The evaluator configuration if found, None otherwise. + The evaluator configuration if found, None otherwise. """ if not hasattr(evaluator, "config"): return None @@ -114,15 +141,14 @@ def extract_evaluator_config(evaluator: Any) -> Optional[EvaluatorConfig]: async def execute_task(task: Callable[[Any], Any], item_input: Any) -> tuple[Any, str]: - """ - Execute a task function (sync or async) and return (output, status). + """Execute a task function (sync or async) and return (output, status). Args: task: The task function to execute. item_input: The input to the task function. Returns: - tuple[Any, str]: The output of the task function and the status of the execution. + A tuple of (task_output, status_string). """ try: result = task(item_input) @@ -138,10 +164,9 @@ async def run_single_evaluator( item_input: Any, task_output: Any, expected_output: Any, - metadata: Optional[Dict[str, Any]], -) -> Optional[Dict[str, Any]]: - """ - Run a single evaluator and return normalized result. + metadata: Optional[dict[str, Any]], +) -> Optional[dict[str, Any]]: + """Run a single evaluator and return normalized result. Args: evaluator: The evaluator object. @@ -151,7 +176,7 @@ async def run_single_evaluator( metadata: Optional metadata to be passed to the evaluator. Returns: - Optional[Dict[str, Any]]: The normalized result of the evaluator if successful, None otherwise. + The normalized result dict if successful, None otherwise. """ if not hasattr(evaluator, "evaluate"): return None @@ -185,22 +210,21 @@ async def run_single_evaluator( def build_item_payload( - ctx: "ItemContext", + ctx: ItemContext, status: str, include_output: bool = False, -) -> Dict[str, Any]: - """ - Build a payload dict for posting item status. +) -> dict[str, Any]: + """Build a payload dict for posting item status. Args: ctx: The item context. - status: The status of the item. + status: The status of the item (e.g. "completed", "failed"). include_output: Whether to include the task output in the payload. Returns: - Dict[str, Any]: The payload dict. + The payload dict ready for HTTP submission. """ - payload: Dict[str, Any] = { + payload: dict[str, Any] = { "traceId": ctx.trace_id, "sessionId": ctx.session_id, } @@ -213,7 +237,7 @@ def build_item_payload( if ctx.metadata: payload["metadata"] = ctx.metadata - if ctx.status == "failed": + if status == "failed": payload["status"] = "failed" return payload @@ -225,37 +249,35 @@ def build_item_payload( def validate_run_inputs( name: str, - data: Any, + dataset: Any, task: Callable[[Any], Any], ) -> None: - """ - Validate required inputs for run_test_suite. + """Validate required inputs for run_test_suite. Args: name: The name of the run. - data: The dataset to be used for the test suite. + dataset: The dataset to be used for the test suite. task: The task to be executed for each item in the dataset. Raises: ValueError: If any required input is missing or invalid. """ if not name: - raise ValueError("netra.evaluation: run name is required") - if not data: - raise ValueError("netra.evaluation: data is required") + raise ValueError(f"{LOG_PREFIX}: run name is required") + if not dataset: + raise ValueError(f"{LOG_PREFIX}: dataset is required") if task is None: - raise ValueError("netra.evaluation: task function is required") + raise ValueError(f"{LOG_PREFIX}: task function is required") -def extract_dataset_id(items: List[Any]) -> Optional[str]: # noqa: E501 - """ - Extract dataset_id from items if they are DatasetRecords. +def extract_dataset_id(items: list[Any]) -> Optional[str]: + """Extract dataset_id from items if they are DatasetRecords. Args: items: List of items. Returns: - Optional[str]: The dataset_id if found, None otherwise. + The dataset_id if found, None otherwise. """ if items and isinstance(items[0], DatasetRecord): dataset_id: str = items[0].dataset_id @@ -264,18 +286,17 @@ def extract_dataset_id(items: List[Any]) -> Optional[str]: # noqa: E501 def build_evaluators_config( - evaluators: Optional[List[Any]], -) -> List[EvaluatorConfig]: - """ - Build evaluator configurations from evaluator objects. + evaluators: Optional[list[Any]], +) -> list[EvaluatorConfig]: + """Build evaluator configurations from evaluator objects. Args: evaluators: List of evaluators. Returns: - List[EvaluatorConfig]: List of evaluator configurations. + List of evaluator configurations. """ - configs: List[EvaluatorConfig] = [] + configs: list[EvaluatorConfig] = [] if not evaluators: return configs @@ -285,6 +306,7 @@ def build_evaluators_config( continue try: configs.append(config) - except Exception: + except Exception as exc: + logger.warning("%s: Failed to extract evaluator config: %s", LOG_PREFIX, exc) continue return configs diff --git a/tests/test_evaluation.py b/tests/test_evaluation.py new file mode 100644 index 0000000..0a43e78 --- /dev/null +++ b/tests/test_evaluation.py @@ -0,0 +1,1215 @@ +""" +Unit tests for the netra/evaluation/ module. + +Covers models, utils, client, api, and evaluator layers with mocked +HTTP interactions and async helpers. +""" + +import asyncio +from typing import Any +from unittest.mock import MagicMock, patch + +import httpx +import pytest + +from netra.evaluation.evaluator import BaseEvaluator +from netra.evaluation.models import ( + AddDatasetItemResponse, + CreateDatasetResponse, + Dataset, + DatasetItem, + DatasetRecord, + EvaluatorConfig, + EvaluatorContext, + EvaluatorOutput, + GetDatasetItemsResponse, + ItemContext, + ItemProcessingResult, + LocalDataset, + ScoreType, + TurnType, +) +from netra.evaluation.utils import ( + build_evaluators_config, + build_item_payload, + execute_task, + extract_dataset_id, + format_span_id, + format_trace_id, + parse_env_float, + run_async_safely, + run_single_evaluator, + validate_run_inputs, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +class PassEvaluator(BaseEvaluator): + """Evaluator that always passes.""" + + def evaluate(self, context: EvaluatorContext) -> EvaluatorOutput: + return EvaluatorOutput( + evaluator_name=self.config.name, + result=True, + is_passed=True, + reason="always passes", + ) + + +class FailEvaluator(BaseEvaluator): + """Evaluator that always fails.""" + + def evaluate(self, context: EvaluatorContext) -> EvaluatorOutput: + return EvaluatorOutput( + evaluator_name=self.config.name, + result=False, + is_passed=False, + reason="always fails", + ) + + +class AsyncEvaluator(BaseEvaluator): + """Async evaluator that always passes.""" + + async def evaluate(self, context: EvaluatorContext) -> EvaluatorOutput: # type: ignore[override] + return EvaluatorOutput( + evaluator_name=self.config.name, + result=True, + is_passed=True, + reason="async pass", + ) + + +class CrashingEvaluator(BaseEvaluator): + """Evaluator that raises an exception.""" + + def evaluate(self, context: EvaluatorContext) -> EvaluatorOutput: + raise RuntimeError("evaluator exploded") + + +def _make_evaluator_config(name: str = "test_eval", label: str = "Test Evaluator") -> EvaluatorConfig: + """Create a test EvaluatorConfig.""" + return EvaluatorConfig(name=name, label=label, scoreType=ScoreType.BOOLEAN) + + +def _make_config( + endpoint: str = "https://api.getnetra.ai/telemetry", + api_key: str = "key-1", +) -> MagicMock: + """Create a mock Config.""" + cfg = MagicMock() + cfg.otlp_endpoint = endpoint + cfg.api_key = api_key + cfg.headers = {} + return cfg + + +# --------------------------------------------------------------------------- +# Section 1: Models +# --------------------------------------------------------------------------- + + +class TestScoreType: + """Tests for ScoreType enum.""" + + def test_values(self) -> None: + assert ScoreType.BOOLEAN.value == "boolean" + assert ScoreType.NUMERICAL.value == "numerical" + assert ScoreType.CATEGORICAL.value == "categorical" + + +class TestTurnType: + """Tests for TurnType enum.""" + + def test_values(self) -> None: + assert TurnType.SINGLE.value == "single" + assert TurnType.MULTI.value == "multi" + + +class TestDatasetItem: + """Tests for DatasetItem model.""" + + def test_required_input(self) -> None: + item = DatasetItem(input="hello") + assert item.input == "hello" + assert item.expected_output is None + assert item.metadata is None + assert item.tags is None + + def test_all_fields(self) -> None: + item = DatasetItem( + input="hello", + expected_output="world", + metadata={"key": "val"}, + tags=["tag1"], + ) + assert item.expected_output == "world" + assert item.metadata == {"key": "val"} + assert item.tags == ["tag1"] + + +class TestDatasetRecord: + """Tests for DatasetRecord model.""" + + def test_creation(self) -> None: + record = DatasetRecord(id="r1", input="q", dataset_id="ds1") + assert record.id == "r1" + assert record.expected_output is None + + +class TestDataset: + """Tests for Dataset model.""" + + def test_with_dataset_items(self) -> None: + ds = Dataset(items=[DatasetItem(input="a"), DatasetItem(input="b")]) + assert len(ds.items) == 2 + + def test_with_dataset_records(self) -> None: + ds = Dataset( + items=[ + DatasetRecord(id="r1", input="a", dataset_id="ds1"), + DatasetRecord(id="r2", input="b", dataset_id="ds1"), + ] + ) + assert len(ds.items) == 2 + + +class TestEvaluatorConfig: + """Tests for EvaluatorConfig model.""" + + def test_alias(self) -> None: + config = EvaluatorConfig(name="e1", label="Eval 1", scoreType=ScoreType.BOOLEAN) + assert config.score_type == ScoreType.BOOLEAN + + def test_populate_by_name(self) -> None: + config = EvaluatorConfig(name="e1", label="Eval 1", scoreType=ScoreType.NUMERICAL) + assert config.score_type == ScoreType.NUMERICAL + + +class TestItemContext: + """Tests for ItemContext dataclass.""" + + def test_defaults(self) -> None: + ctx = ItemContext(index=0, item_input="hello") + assert ctx.status == "pending" + assert ctx.trace_id == "" + assert ctx.task_output is None + + def test_slots(self) -> None: + ctx = ItemContext(index=0, item_input="hello") + with pytest.raises(AttributeError): + ctx.nonexistent = True # type: ignore[attr-defined] + + +class TestItemProcessingResult: + """Tests for ItemProcessingResult dataclass.""" + + def test_creation(self) -> None: + ctx = ItemContext(index=0, item_input="x") + result = ItemProcessingResult( + item_entry={"index": 0}, + should_run_evaluators=True, + ctx=ctx, + status="completed", + ) + assert result.should_run_evaluators is True + + +class TestLocalDataset: + """Tests for LocalDataset model.""" + + def test_creation(self) -> None: + ld = LocalDataset(items=[DatasetItem(input="x")]) + assert len(ld.items) == 1 + + +class TestCreateDatasetResponse: + """Tests for CreateDatasetResponse model.""" + + def test_creation(self) -> None: + resp = CreateDatasetResponse( + project_id="p1", + organization_id="o1", + name="ds1", + created_by="user", + updated_by="user", + updated_at="2025-01-01", + id="id1", + created_at="2025-01-01", + ) + assert resp.id == "id1" + assert resp.deleted_at is None + + +class TestAddDatasetItemResponse: + """Tests for AddDatasetItemResponse model.""" + + def test_creation(self) -> None: + resp = AddDatasetItemResponse( + dataset_id="ds1", + project_id="p1", + organization_id="o1", + source="sdk", + input="hello", + is_active=True, + created_by="user", + updated_by="user", + updated_at="2025-01-01", + id="item1", + created_at="2025-01-01", + ) + assert resp.is_active is True + + +# --------------------------------------------------------------------------- +# Section 2: Utils +# --------------------------------------------------------------------------- + + +class TestParseEnvFloat: + """Tests for parse_env_float.""" + + def test_returns_default_when_unset(self) -> None: + assert parse_env_float("_NETRA_EVAL_TEST_NONEXISTENT_", 42.0) == 42.0 + + def test_parses_valid_value(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("_NETRA_EVAL_TEST_FLOAT_", "3.14") + assert parse_env_float("_NETRA_EVAL_TEST_FLOAT_", 1.0) == pytest.approx(3.14) + + def test_returns_default_on_invalid(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("_NETRA_EVAL_TEST_FLOAT_", "not-a-number") + assert parse_env_float("_NETRA_EVAL_TEST_FLOAT_", 7.0) == 7.0 + + +class TestFormatTraceId: + """Tests for format_trace_id.""" + + def test_zero(self) -> None: + assert format_trace_id(0) == "0" * 32 + + def test_known_value(self) -> None: + result = format_trace_id(255) + assert result == "0" * 30 + "ff" + assert len(result) == 32 + + +class TestFormatSpanId: + """Tests for format_span_id.""" + + def test_zero(self) -> None: + assert format_span_id(0) == "0" * 16 + + def test_known_value(self) -> None: + result = format_span_id(255) + assert result == "0" * 14 + "ff" + assert len(result) == 16 + + +class TestRunAsyncSafely: + """Tests for run_async_safely.""" + + def test_runs_coroutine(self) -> None: + async def coro() -> int: + return 42 + + assert run_async_safely(coro()) == 42 + + def test_propagates_exception(self) -> None: + async def coro() -> None: + raise ValueError("boom") + + with pytest.raises(ValueError, match="boom"): + run_async_safely(coro()) + + +class TestExecuteTask: + """Tests for execute_task.""" + + def test_sync_task(self) -> None: + def task(inp: str) -> str: + return f"result: {inp}" + + output, status = asyncio.run(execute_task(task, "hello")) + assert output == "result: hello" + assert status == "completed" + + def test_async_task(self) -> None: + async def task(inp: str) -> str: + return f"async: {inp}" + + output, status = asyncio.run(execute_task(task, "hello")) + assert output == "async: hello" + assert status == "completed" + + def test_failed_task(self) -> None: + def task(inp: str) -> str: + raise ValueError("task error") + + output, status = asyncio.run(execute_task(task, "hello")) + assert status == "failed" + assert "task error" in output + + +class TestValidateRunInputs: + """Tests for validate_run_inputs.""" + + def test_valid(self) -> None: + validate_run_inputs("name", Dataset(items=[DatasetItem(input="x")]), lambda x: x) + + def test_missing_name(self) -> None: + with pytest.raises(ValueError, match="run name is required"): + validate_run_inputs("", Dataset(items=[DatasetItem(input="x")]), lambda x: x) + + def test_missing_task(self) -> None: + with pytest.raises(ValueError, match="task function is required"): + validate_run_inputs("name", Dataset(items=[DatasetItem(input="x")]), None) # type: ignore[arg-type] + + +class TestExtractDatasetId: + """Tests for extract_dataset_id.""" + + def test_with_dataset_records(self) -> None: + items = [DatasetRecord(id="r1", input="a", dataset_id="ds1")] + assert extract_dataset_id(items) == "ds1" + + def test_with_dataset_items(self) -> None: + items = [DatasetItem(input="a")] + assert extract_dataset_id(items) is None + + def test_empty_list(self) -> None: + assert extract_dataset_id([]) is None + + +class TestBuildEvaluatorsConfig: + """Tests for build_evaluators_config.""" + + def test_none_evaluators(self) -> None: + assert build_evaluators_config(None) == [] + + def test_empty_evaluators(self) -> None: + assert build_evaluators_config([]) == [] + + def test_extracts_configs(self) -> None: + config = _make_evaluator_config() + evaluator = PassEvaluator(config) + result = build_evaluators_config([evaluator]) + assert len(result) == 1 + assert result[0].name == "test_eval" + + def test_skips_evaluators_without_config(self) -> None: + no_config = MagicMock(spec=[]) + result = build_evaluators_config([no_config]) + assert result == [] + + +class TestBuildItemPayload: + """Tests for build_item_payload.""" + + def test_completed_with_output(self) -> None: + ctx = ItemContext( + index=0, + item_input="hello", + expected_output="world", + trace_id="trace-1", + session_id="session-1", + task_output="result", + status="completed", + ) + payload = build_item_payload(ctx, status="completed", include_output=True) + assert payload["traceId"] == "trace-1" + assert payload["taskOutput"] == "result" + assert "status" not in payload + + def test_failed_status(self) -> None: + ctx = ItemContext( + index=0, + item_input="hello", + trace_id="trace-1", + status="failed", + ) + payload = build_item_payload(ctx, status="failed") + assert payload["status"] == "failed" + assert "taskOutput" not in payload + + def test_uses_passed_status_not_ctx_status(self) -> None: + ctx = ItemContext( + index=0, + item_input="hello", + trace_id="trace-1", + status="completed", + task_output="result", + ) + payload = build_item_payload(ctx, status="failed") + assert payload["status"] == "failed" + + def test_with_dataset_item_id(self) -> None: + ctx = ItemContext( + index=0, + item_input="hello", + dataset_item_id="item-1", + trace_id="trace-1", + ) + payload = build_item_payload(ctx, status="completed") + assert payload["datasetItemId"] == "item-1" + assert "input" not in payload + + def test_without_dataset_item_id(self) -> None: + ctx = ItemContext( + index=0, + item_input="hello", + expected_output="world", + metadata={"key": "val"}, + trace_id="trace-1", + ) + payload = build_item_payload(ctx, status="completed") + assert payload["input"] == "hello" + assert payload["expectedOutput"] == "world" + assert payload["metadata"] == {"key": "val"} + + +class TestRunSingleEvaluator: + """Tests for run_single_evaluator.""" + + def test_sync_evaluator(self) -> None: + config = _make_evaluator_config() + evaluator = PassEvaluator(config) + result = asyncio.run( + run_single_evaluator( + evaluator=evaluator, + item_input="hello", + task_output="world", + expected_output="world", + metadata=None, + ) + ) + assert result is not None + assert result["evaluatorName"] == "test_eval" + assert result["isPassed"] is True + + def test_async_evaluator(self) -> None: + config = _make_evaluator_config() + evaluator = AsyncEvaluator(config) + result = asyncio.run( + run_single_evaluator( + evaluator=evaluator, + item_input="hello", + task_output="world", + expected_output="world", + metadata=None, + ) + ) + assert result is not None + assert result["isPassed"] is True + + def test_evaluator_without_evaluate(self) -> None: + evaluator = MagicMock(spec=[]) + result = asyncio.run( + run_single_evaluator( + evaluator=evaluator, + item_input="hello", + task_output="world", + expected_output="world", + metadata=None, + ) + ) + assert result is None + + def test_name_mismatch_returns_none(self) -> None: + config = _make_evaluator_config(name="expected_name") + + class WrongNameEvaluator(BaseEvaluator): + def evaluate(self, context: EvaluatorContext) -> EvaluatorOutput: + return EvaluatorOutput( + evaluator_name="wrong_name", + result=True, + is_passed=True, + ) + + evaluator = WrongNameEvaluator(config) + result = asyncio.run( + run_single_evaluator( + evaluator=evaluator, + item_input="hello", + task_output="world", + expected_output="world", + metadata=None, + ) + ) + assert result is None + + +# --------------------------------------------------------------------------- +# Section 3: Client +# --------------------------------------------------------------------------- + + +class TestEvaluationHttpClient: + """Tests for EvaluationHttpClient.""" + + def test_create_client_with_valid_config(self) -> None: + from netra.evaluation.client import EvaluationHttpClient + + client = EvaluationHttpClient(_make_config()) + assert client._client is not None + client.close() + + def test_create_client_strips_telemetry_suffix(self) -> None: + from netra.evaluation.client import EvaluationHttpClient + + client = EvaluationHttpClient(_make_config(endpoint="https://api.getnetra.ai/telemetry")) + assert client._client is not None + assert "/telemetry" not in str(client._client.base_url) + client.close() + + def test_create_client_returns_none_on_empty_endpoint(self) -> None: + from netra.evaluation.client import EvaluationHttpClient + + client = EvaluationHttpClient(_make_config(endpoint="")) + assert client._client is None + + def test_close_sets_client_to_none(self) -> None: + from netra.evaluation.client import EvaluationHttpClient + + client = EvaluationHttpClient(_make_config()) + assert client._client is not None + client.close() + assert client._client is None + + def test_close_idempotent(self) -> None: + from netra.evaluation.client import EvaluationHttpClient + + client = EvaluationHttpClient(_make_config()) + client.close() + client.close() + assert client._client is None + + def test_ensure_client_returns_none_when_not_initialized(self) -> None: + from netra.evaluation.client import EvaluationHttpClient + + client = EvaluationHttpClient(_make_config(endpoint="")) + assert client._ensure_client() is None + + def test_extract_error_message_from_response(self) -> None: + from netra.evaluation.client import EvaluationHttpClient + + client = EvaluationHttpClient(_make_config(endpoint="")) + mock_response = MagicMock(spec=httpx.Response) + mock_response.json.return_value = {"error": {"message": "custom error"}} + result = client._extract_error_message(mock_response, ValueError("fallback")) + assert result == "custom error" + + def test_extract_error_message_fallback_on_none_response(self) -> None: + from netra.evaluation.client import EvaluationHttpClient + + client = EvaluationHttpClient(_make_config(endpoint="")) + result = client._extract_error_message(None, ValueError("fallback")) + assert result == "fallback" + + def test_extract_error_message_fallback_on_missing_error_key(self) -> None: + from netra.evaluation.client import EvaluationHttpClient + + client = EvaluationHttpClient(_make_config(endpoint="")) + mock_response = MagicMock(spec=httpx.Response) + mock_response.json.return_value = {"other": "data"} + result = client._extract_error_message(mock_response, ValueError("fallback")) + assert result == "fallback" + + def test_extract_error_message_fallback_on_json_parse_error(self) -> None: + from netra.evaluation.client import EvaluationHttpClient + + client = EvaluationHttpClient(_make_config(endpoint="")) + mock_response = MagicMock(spec=httpx.Response) + mock_response.json.side_effect = ValueError("not json") + result = client._extract_error_message(mock_response, RuntimeError("orig")) + assert result == "orig" + + @patch("netra.evaluation.client.httpx.Client") + def test_create_dataset_success(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.client import EvaluationHttpClient + + mock_response = MagicMock(spec=httpx.Response) + mock_response.json.return_value = {"data": {"id": "ds-1", "name": "test"}} + mock_response.raise_for_status = MagicMock() + + mock_instance = MagicMock() + mock_instance.post.return_value = mock_response + mock_client_cls.return_value = mock_instance + + client = EvaluationHttpClient(_make_config()) + result = client.create_dataset(name="test") + assert result is not None + assert result["id"] == "ds-1" + + @patch("netra.evaluation.client.httpx.Client") + def test_create_dataset_returns_none_on_error(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.client import EvaluationHttpClient + + mock_instance = MagicMock() + mock_instance.post.side_effect = httpx.ConnectError("timeout") + mock_client_cls.return_value = mock_instance + + client = EvaluationHttpClient(_make_config()) + result = client.create_dataset(name="test") + assert result is None + + def test_create_dataset_returns_none_without_client(self) -> None: + from netra.evaluation.client import EvaluationHttpClient + + client = EvaluationHttpClient(_make_config(endpoint="")) + result = client.create_dataset(name="test") + assert result is None + + @patch("netra.evaluation.client.httpx.Client") + def test_create_run_success(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.client import EvaluationHttpClient + + mock_response = MagicMock(spec=httpx.Response) + mock_response.json.return_value = {"data": {"id": "run-1"}} + mock_response.raise_for_status = MagicMock() + + mock_instance = MagicMock() + mock_instance.post.return_value = mock_response + mock_client_cls.return_value = mock_instance + + client = EvaluationHttpClient(_make_config()) + result = client.create_run(name="test") + assert result is not None + assert result["id"] == "run-1" + + @patch("netra.evaluation.client.httpx.Client") + def test_post_run_item_success(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.client import EvaluationHttpClient + + mock_response = MagicMock(spec=httpx.Response) + mock_response.json.return_value = {"data": {"item": {"id": "item-1"}}} + mock_response.raise_for_status = MagicMock() + + mock_instance = MagicMock() + mock_instance.post.return_value = mock_response + mock_client_cls.return_value = mock_instance + + client = EvaluationHttpClient(_make_config()) + result = client.post_run_item("run-1", {"traceId": "t1"}) + assert result == "item-1" + + @patch("netra.evaluation.client.httpx.Client") + def test_post_run_item_returns_none_on_error(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.client import EvaluationHttpClient + + mock_instance = MagicMock() + mock_instance.post.side_effect = httpx.ConnectError("timeout") + mock_client_cls.return_value = mock_instance + + client = EvaluationHttpClient(_make_config()) + result = client.post_run_item("run-1", {"traceId": "t1"}) + assert result is None + + @patch("netra.evaluation.client.httpx.Client") + def test_get_dataset_success(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.client import EvaluationHttpClient + + mock_response = MagicMock(spec=httpx.Response) + mock_response.json.return_value = {"data": [{"id": "item-1", "input": "q", "datasetId": "ds-1"}]} + mock_response.raise_for_status = MagicMock() + + mock_instance = MagicMock() + mock_instance.get.return_value = mock_response + mock_client_cls.return_value = mock_instance + + client = EvaluationHttpClient(_make_config()) + result = client.get_dataset("ds-1") + assert result is not None + assert len(result) == 1 + + @patch("netra.evaluation.client.httpx.Client") + def test_post_run_status_success(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.client import EvaluationHttpClient + + mock_response = MagicMock(spec=httpx.Response) + mock_response.json.return_value = {"data": {"status": "completed"}} + mock_response.raise_for_status = MagicMock() + + mock_instance = MagicMock() + mock_instance.post.return_value = mock_response + mock_client_cls.return_value = mock_instance + + client = EvaluationHttpClient(_make_config()) + result = client.post_run_status("run-1", "completed") + assert result == {"status": "completed"} + + @patch("netra.evaluation.client.httpx.Client") + def test_post_run_status_returns_none_on_error(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.client import EvaluationHttpClient + + mock_instance = MagicMock() + mock_instance.post.side_effect = httpx.ConnectError("timeout") + mock_client_cls.return_value = mock_instance + + client = EvaluationHttpClient(_make_config()) + result = client.post_run_status("run-1", "completed") + assert result is None + + @patch("netra.evaluation.client.httpx.Client") + def test_get_run_results_success(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.client import EvaluationHttpClient + + mock_response = MagicMock(spec=httpx.Response) + mock_response.json.return_value = {"data": {"items": []}} + mock_response.raise_for_status = MagicMock() + + mock_instance = MagicMock() + mock_instance.get.return_value = mock_response + mock_client_cls.return_value = mock_instance + + client = EvaluationHttpClient(_make_config()) + result = client.get_run_results("run-1") + assert result is not None + + @patch("netra.evaluation.client.httpx.Client") + def test_submit_local_evaluations_success(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.client import EvaluationHttpClient + + mock_response = MagicMock(spec=httpx.Response) + mock_response.json.return_value = {"data": {"success": True}} + mock_response.raise_for_status = MagicMock() + + mock_instance = MagicMock() + mock_instance.post.return_value = mock_response + mock_client_cls.return_value = mock_instance + + client = EvaluationHttpClient(_make_config()) + result = client.submit_local_evaluations("run-1", "item-1", [{"evaluatorName": "e1"}]) + assert result is not None + + @patch("netra.evaluation.client.httpx.Client") + def test_add_dataset_item_success(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.client import EvaluationHttpClient + + mock_response = MagicMock(spec=httpx.Response) + mock_response.json.return_value = {"data": {"id": "item-1", "input": "q"}} + mock_response.raise_for_status = MagicMock() + + mock_instance = MagicMock() + mock_instance.post.return_value = mock_response + mock_client_cls.return_value = mock_instance + + client = EvaluationHttpClient(_make_config()) + item = DatasetItem(input="hello", expected_output="world") + result = client.add_dataset_item("ds-1", item) + assert result is not None + assert result["id"] == "item-1" + + @patch("netra.evaluation.client.httpx.Client") + def test_get_span_by_id_success(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.client import EvaluationHttpClient + + mock_response = MagicMock(spec=httpx.Response) + mock_response.json.return_value = {"data": {"spanId": "span-1"}} + mock_response.raise_for_status = MagicMock() + + mock_instance = MagicMock() + mock_instance.get.return_value = mock_response + mock_client_cls.return_value = mock_instance + + client = EvaluationHttpClient(_make_config()) + result = client.get_span_by_id("span-1") + assert result is not None + + @patch("netra.evaluation.client.httpx.Client") + def test_get_span_by_id_returns_none_on_error(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.client import EvaluationHttpClient + + mock_instance = MagicMock() + mock_instance.get.side_effect = httpx.ConnectError("timeout") + mock_client_cls.return_value = mock_instance + + client = EvaluationHttpClient(_make_config()) + result = client.get_span_by_id("span-1") + assert result is None + + +# --------------------------------------------------------------------------- +# Section 4: API (Evaluation class) +# --------------------------------------------------------------------------- + + +class TestEvaluation: + """Tests for the Evaluation public API.""" + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_create_dataset_success(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + mock_client = MagicMock() + mock_client.create_dataset.return_value = { + "projectId": "p1", + "organizationId": "o1", + "name": "ds1", + "tags": [], + "createdBy": "user", + "updatedBy": "user", + "updatedAt": "2025-01-01", + "id": "ds-1", + "createdAt": "2025-01-01", + } + mock_client_cls.return_value = mock_client + + evaluation = Evaluation(_make_config()) + result = evaluation.create_dataset(name="ds1") + assert isinstance(result, CreateDatasetResponse) + assert result.id == "ds-1" + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_create_dataset_returns_none_on_empty_name(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + evaluation = Evaluation(_make_config()) + result = evaluation.create_dataset(name="") + assert result is None + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_create_dataset_returns_none_on_client_failure(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + mock_client = MagicMock() + mock_client.create_dataset.return_value = None + mock_client_cls.return_value = mock_client + + evaluation = Evaluation(_make_config()) + result = evaluation.create_dataset(name="ds1") + assert result is None + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_add_dataset_item_success(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + mock_client = MagicMock() + mock_client.add_dataset_item.return_value = { + "datasetId": "ds-1", + "projectId": "p1", + "organizationId": "o1", + "source": "sdk", + "input": "hello", + "createdBy": "user", + "updatedBy": "user", + "updatedAt": "2025-01-01", + "id": "item-1", + "createdAt": "2025-01-01", + } + mock_client_cls.return_value = mock_client + + evaluation = Evaluation(_make_config()) + result = evaluation.add_dataset_item("ds-1", DatasetItem(input="hello")) + assert isinstance(result, AddDatasetItemResponse) + assert result.id == "item-1" + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_add_dataset_item_returns_none_on_empty_input(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + evaluation = Evaluation(_make_config()) + result = evaluation.add_dataset_item("ds-1", DatasetItem(input="")) + assert result is None + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_add_dataset_item_returns_none_on_client_failure(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + mock_client = MagicMock() + mock_client.add_dataset_item.return_value = None + mock_client_cls.return_value = mock_client + + evaluation = Evaluation(_make_config()) + result = evaluation.add_dataset_item("ds-1", DatasetItem(input="hello")) + assert result is None + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_get_dataset_success(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + mock_client = MagicMock() + mock_client.get_dataset.return_value = [ + {"id": "item-1", "input": "q", "datasetId": "ds-1", "expectedOutput": "a"} + ] + mock_client_cls.return_value = mock_client + + evaluation = Evaluation(_make_config()) + result = evaluation.get_dataset("ds-1") + assert isinstance(result, GetDatasetItemsResponse) + assert len(result.items) == 1 + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_get_dataset_returns_none_on_empty_id(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + evaluation = Evaluation(_make_config()) + result = evaluation.get_dataset("") + assert result is None + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_get_dataset_skips_invalid_items(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + mock_client = MagicMock() + mock_client.get_dataset.return_value = [ + {"id": "item-1", "input": "q", "datasetId": "ds-1"}, + {"id": None, "input": "q", "datasetId": "ds-1"}, + ] + mock_client_cls.return_value = mock_client + + evaluation = Evaluation(_make_config()) + result = evaluation.get_dataset("ds-1") + assert isinstance(result, GetDatasetItemsResponse) + assert len(result.items) == 1 + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_create_run_success(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + mock_client = MagicMock() + mock_client.create_run.return_value = {"id": "run-1"} + mock_client_cls.return_value = mock_client + + evaluation = Evaluation(_make_config()) + result = evaluation.create_run(name="test") + assert result == "run-1" + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_create_run_returns_none_on_empty_name(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + evaluation = Evaluation(_make_config()) + result = evaluation.create_run(name="") + assert result is None + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_create_run_returns_none_on_client_failure(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + mock_client = MagicMock() + mock_client.create_run.return_value = None + mock_client_cls.return_value = mock_client + + evaluation = Evaluation(_make_config()) + result = evaluation.create_run(name="test") + assert result is None + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_get_run_results(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + mock_client = MagicMock() + mock_client.get_run_results.return_value = {"items": []} + mock_client_cls.return_value = mock_client + + evaluation = Evaluation(_make_config()) + result = evaluation.get_run_results("run-1") + assert result == {"items": []} + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_get_run_results_returns_none_on_empty_id(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + evaluation = Evaluation(_make_config()) + result = evaluation.get_run_results("") + assert result is None + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_close_delegates_to_client(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + + evaluation = Evaluation(_make_config()) + evaluation.close() + mock_client.close.assert_called_once() + + @patch("netra.evaluation.api.SpanWrapper") + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_run_test_suite_success(self, mock_client_cls: MagicMock, mock_span_wrapper: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + mock_span = MagicMock() + mock_span.__enter__ = MagicMock(return_value=mock_span) + mock_span.__exit__ = MagicMock(return_value=False) + mock_span.get_current_span.return_value = None + mock_span_wrapper.return_value = mock_span + + mock_client = MagicMock() + mock_client.create_run.return_value = {"id": "run-1"} + mock_client.post_run_item.return_value = "item-1" + mock_client.post_run_status.return_value = None + + async def mock_wait(*args: Any, **kwargs: Any) -> bool: + return True + + mock_client.wait_for_span_ingestion = mock_wait + mock_client_cls.return_value = mock_client + + evaluation = Evaluation(_make_config()) + dataset = Dataset(items=[DatasetItem(input="hello")]) + result = evaluation.run_test_suite( + name="test", + data=dataset, + task=lambda x: f"result: {x}", + ) + + assert result is not None + assert result["runId"] == "run-1" + assert len(result["items"]) == 1 + + @patch("netra.evaluation.api.SpanWrapper") + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_run_test_suite_marks_failed_on_exception( + self, mock_client_cls: MagicMock, mock_span_wrapper: MagicMock + ) -> None: + from netra.evaluation.api import Evaluation + + mock_span = MagicMock() + mock_span.__enter__ = MagicMock(return_value=mock_span) + mock_span.__exit__ = MagicMock(return_value=False) + mock_span.get_current_span.return_value = None + mock_span_wrapper.return_value = mock_span + + mock_client = MagicMock() + mock_client.create_run.return_value = {"id": "run-1"} + mock_client.post_run_item.side_effect = RuntimeError("backend down") + mock_client.post_run_status.return_value = None + mock_client_cls.return_value = mock_client + + evaluation = Evaluation(_make_config()) + dataset = Dataset(items=[DatasetItem(input="hello")]) + + with pytest.raises(RuntimeError, match="backend down"): + evaluation.run_test_suite( + name="test", + data=dataset, + task=lambda x: f"result: {x}", + ) + + mock_client.post_run_status.assert_called_with("run-1", "failed") + + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_run_test_suite_returns_none_when_create_run_fails(self, mock_client_cls: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + mock_client = MagicMock() + mock_client.create_run.return_value = None + mock_client_cls.return_value = mock_client + + evaluation = Evaluation(_make_config()) + dataset = Dataset(items=[DatasetItem(input="hello")]) + result = evaluation.run_test_suite( + name="test", + data=dataset, + task=lambda x: x, + ) + assert result is None + + @patch("netra.evaluation.api.SpanWrapper") + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_run_test_suite_with_evaluators(self, mock_client_cls: MagicMock, mock_span_wrapper: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + mock_span = MagicMock() + mock_span.__enter__ = MagicMock(return_value=mock_span) + mock_span.__exit__ = MagicMock(return_value=False) + mock_otel_span = MagicMock() + mock_otel_span.get_span_context.return_value = MagicMock(trace_id=123, span_id=456) + mock_span.get_current_span.return_value = mock_otel_span + mock_span_wrapper.return_value = mock_span + + async def mock_wait(*args: Any, **kwargs: Any) -> bool: + return True + + mock_client = MagicMock() + mock_client.create_run.return_value = {"id": "run-1"} + mock_client.post_run_item.return_value = "item-1" + mock_client.post_run_status.return_value = None + mock_client.wait_for_span_ingestion = mock_wait + mock_client.submit_local_evaluations.return_value = None + mock_client_cls.return_value = mock_client + + config = _make_evaluator_config() + evaluator = PassEvaluator(config) + + evaluation = Evaluation(_make_config()) + dataset = Dataset(items=[DatasetItem(input="hello")]) + result = evaluation.run_test_suite( + name="test", + data=dataset, + task=lambda x: f"result: {x}", + evaluators=[evaluator], + ) + + assert result is not None + assert result["runId"] == "run-1" + mock_client.submit_local_evaluations.assert_called_once() + + @patch("netra.evaluation.api.SpanWrapper") + @patch("netra.evaluation.api.EvaluationHttpClient") + def test_run_test_suite_handles_failed_task(self, mock_client_cls: MagicMock, mock_span_wrapper: MagicMock) -> None: + from netra.evaluation.api import Evaluation + + mock_span = MagicMock() + mock_span.__enter__ = MagicMock(return_value=mock_span) + mock_span.__exit__ = MagicMock(return_value=False) + mock_span.get_current_span.return_value = None + mock_span_wrapper.return_value = mock_span + + mock_client = MagicMock() + mock_client.create_run.return_value = {"id": "run-1"} + mock_client.post_run_item.return_value = "item-1" + mock_client.post_run_status.return_value = None + mock_client_cls.return_value = mock_client + + def failing_task(inp: Any) -> None: + raise ValueError("task failed") + + evaluation = Evaluation(_make_config()) + dataset = Dataset(items=[DatasetItem(input="hello")]) + result = evaluation.run_test_suite( + name="test", + data=dataset, + task=failing_task, + ) + + assert result is not None + assert result["items"][0]["status"] == "failed" + + +# --------------------------------------------------------------------------- +# Section 5: Evaluator +# --------------------------------------------------------------------------- + + +class TestBaseEvaluator: + """Tests for BaseEvaluator abstract class.""" + + def test_cannot_instantiate_directly(self) -> None: + with pytest.raises(TypeError): + BaseEvaluator(_make_evaluator_config()) # type: ignore[abstract] + + def test_sync_subclass(self) -> None: + config = _make_evaluator_config() + evaluator = PassEvaluator(config) + context = EvaluatorContext(input="x", task_output="y") + result = evaluator.evaluate(context) + assert isinstance(result, EvaluatorOutput) + assert result.is_passed is True + + def test_fail_evaluator(self) -> None: + config = _make_evaluator_config() + evaluator = FailEvaluator(config) + context = EvaluatorContext(input="x", task_output="y") + result = evaluator.evaluate(context) + assert result.is_passed is False + + def test_config_accessible(self) -> None: + config = _make_evaluator_config(name="my_eval") + evaluator = PassEvaluator(config) + assert evaluator.config.name == "my_eval"