diff --git a/.gitignore b/.gitignore index 01fe8e22..161403e7 100644 --- a/.gitignore +++ b/.gitignore @@ -229,3 +229,4 @@ local_settings.py Dockerfile CLAUDE.md .omc/ +.deepeval/ diff --git a/src/bedrock_agentcore/evaluation/integrations/__init__.py b/src/bedrock_agentcore/evaluation/integrations/__init__.py index 33048d5d..a1ff7691 100644 --- a/src/bedrock_agentcore/evaluation/integrations/__init__.py +++ b/src/bedrock_agentcore/evaluation/integrations/__init__.py @@ -1 +1,5 @@ """AgentCore Evaluation integrations.""" + +from bedrock_agentcore.evaluation.integrations.base import BaseAdapter, ParsedEvaluationEvent + +__all__ = ["BaseAdapter", "ParsedEvaluationEvent"] diff --git a/src/bedrock_agentcore/evaluation/integrations/autoevals/__init__.py b/src/bedrock_agentcore/evaluation/integrations/autoevals/__init__.py new file mode 100644 index 00000000..0bc3b4ff --- /dev/null +++ b/src/bedrock_agentcore/evaluation/integrations/autoevals/__init__.py @@ -0,0 +1,5 @@ +"""Autoevals integration for AgentCore Evaluation.""" + +from bedrock_agentcore.evaluation.integrations.autoevals.adapter import AutoevalsAdapter + +__all__ = ["AutoevalsAdapter"] diff --git a/src/bedrock_agentcore/evaluation/integrations/autoevals/adapter.py b/src/bedrock_agentcore/evaluation/integrations/autoevals/adapter.py new file mode 100644 index 00000000..fe89435e --- /dev/null +++ b/src/bedrock_agentcore/evaluation/integrations/autoevals/adapter.py @@ -0,0 +1,72 @@ +"""Autoevals adapter for AgentCore evaluation integrations.""" + +import logging +from typing import Any, Callable, Dict, Optional + +from bedrock_agentcore.evaluation.integrations.base import BaseAdapter + +logger = logging.getLogger(__name__) + + +class AutoevalsAdapter(BaseAdapter): + """Adapter that runs an Autoevals scorer against AgentCore evaluation events. + + Example:: + + from autoevals import Factuality + + scorer = Factuality() + handler = AutoevalsAdapter(scorer=scorer) + + # Use as Lambda handler + def lambda_handler(event, context): + return handler(event, context) + """ + + def __init__( + self, + scorer: Any, + field_mapper: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None, + timeout: Optional[int] = None, + ): + """Initialize the adapter. + + Args: + scorer: An Autoevals scorer instance (e.g. Factuality(), ClosedQA()). + field_mapper: Optional callable that receives the raw Lambda event and + returns a dict of field values. Bypasses default span extraction. + timeout: Maximum seconds to allow for scorer.eval(). Defaults to 290. + """ + super().__init__(field_mapper=field_mapper, timeout=timeout) + self.scorer = scorer + + def validate_fields(self, fields: Dict[str, Any]) -> None: + """Validate that input and actual_output are present.""" + missing = [] + if not fields.get("input"): + missing.append("input") + if not fields.get("actual_output"): + missing.append("actual_output") + if missing: + scorer_name = type(self.scorer).__name__ + raise ValueError( + f"Field(s) {missing} required by {scorer_name} but not found in evaluation event. " + f"Provide a field_mapper or ensure spans contain the necessary data." + ) + + def execute(self, fields: Dict[str, Any]) -> Dict[str, Any]: + """Run the Autoevals scorer and return formatted results.""" + kwargs: Dict[str, Any] = { + "input": fields.get("input", ""), + "output": fields.get("actual_output", ""), + } + if fields.get("expected_output"): + kwargs["expected"] = fields["expected_output"] + + result = self.scorer.eval(**kwargs) + + score = result.score + label = "Pass" if score is not None and score >= 0.5 else "Fail" + explanation = getattr(result, "metadata", {}).get("rationale", "") if hasattr(result, "metadata") else "" + + return {"value": score, "label": label, "explanation": explanation} diff --git a/src/bedrock_agentcore/evaluation/integrations/base.py b/src/bedrock_agentcore/evaluation/integrations/base.py new file mode 100644 index 00000000..a10f6606 --- /dev/null +++ b/src/bedrock_agentcore/evaluation/integrations/base.py @@ -0,0 +1,302 @@ +"""Base adapter for AgentCore evaluation integrations.""" + +import abc +import json +import logging +import threading +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional, Union + +from bedrock_agentcore.evaluation.custom_code_based_evaluators.models import EvaluatorInput + +logger = logging.getLogger(__name__) + + +@dataclass +class ParsedEvaluationEvent: + """Parsed representation of the AgentCore Lambda evaluation event.""" + + evaluation_level: str + session_spans: List[Dict[str, Any]] + target_trace_id: Optional[str] = None + target_span_id: Optional[str] = None + reference_inputs: List[Dict[str, Any]] = field(default_factory=list) + + @classmethod + def from_lambda_event(cls, event: Dict[str, Any]) -> "ParsedEvaluationEvent": + """Parse a raw Lambda event dict into a structured object. + + Args: + event: Raw Lambda event payload from the evaluation service. + + Returns: + ParsedEvaluationEvent with extracted fields. + + Raises: + KeyError: If required top-level fields are missing. + """ + evaluation_input = event["evaluationInput"] + target = event.get("evaluationTarget") or {} + trace_ids = target.get("traceIds") or [] + span_ids = target.get("spanIds") or [] + + return cls( + evaluation_level=event["evaluationLevel"], + session_spans=evaluation_input["sessionSpans"], + target_trace_id=trace_ids[0] if trace_ids else None, + target_span_id=span_ids[0] if span_ids else None, + reference_inputs=event.get("evaluationReferenceInputs") or [], + ) + + +def _get_message_content(message: Any) -> str: + """Extract text content from a message object. + + Message content can be a dict with a "content" or "message" key, or a plain string. + Handles one level of nesting (e.g. {"content": {"content": "text"}}). + """ + if isinstance(message, str): + return message + if isinstance(message, dict): + for key in ("content", "message"): + if key in message: + val = message[key] + if isinstance(val, str): + return val + if isinstance(val, dict): + return _get_message_content(val) + return str(val) + return "" + + +def extract_fields_from_spans( + parsed: ParsedEvaluationEvent, +) -> Dict[str, Any]: + """Extract evaluation fields from AgentCore session spans. + + Parses _eval_log_records from span attributes, filters by target_trace_id, + and extracts messages by role: + - input ← input messages where role=="user" + - actual_output ← output messages where role=="assistant" + - retrieval_context ← output messages where role=="tool" + - context ← same as retrieval_context + - expected_output ← evaluationReferenceInputs[0].expectedResponse + """ + user_messages: List[str] = [] + assistant_messages: List[str] = [] + tool_messages: List[str] = [] + + for span in parsed.session_spans: + attributes = span.get("attributes", {}) + log_records_raw = attributes.get("_eval_log_records") + if not log_records_raw: + continue + + if isinstance(log_records_raw, str): + try: + log_records = json.loads(log_records_raw) + except (json.JSONDecodeError, TypeError): + logger.debug("Failed to parse _eval_log_records as JSON") + continue + else: + log_records = log_records_raw + + if not isinstance(log_records, list): + continue + + for record in log_records: + if not isinstance(record, dict): + continue + + if parsed.target_trace_id: + record_trace_id = record.get("traceId") or record.get("trace_id") + if record_trace_id and record_trace_id != parsed.target_trace_id: + continue + + body = record.get("body", {}) + if not isinstance(body, dict): + continue + + input_data = body.get("input", {}) + if isinstance(input_data, dict): + for msg in input_data.get("messages", []): + if not isinstance(msg, dict): + continue + role = msg.get("role", "") + content = _get_message_content(msg) + if role == "user" and content: + user_messages.append(content) + + output_data = body.get("output", {}) + if isinstance(output_data, dict): + for msg in output_data.get("messages", []): + if not isinstance(msg, dict): + continue + role = msg.get("role", "") + content = _get_message_content(msg) + if role == "assistant" and content: + assistant_messages.append(content) + elif role == "tool" and content: + tool_messages.append(content) + + fields: Dict[str, Any] = {} + + if user_messages: + fields["input"] = "\n".join(user_messages) + if assistant_messages: + fields["actual_output"] = "\n".join(assistant_messages) + if tool_messages: + fields["retrieval_context"] = tool_messages + fields["context"] = tool_messages + + if parsed.reference_inputs: + expected = parsed.reference_inputs[0].get("expectedResponse") + if expected: + fields["expected_output"] = expected + + return fields + + +class _ExecutionTimeout(Exception): + """Raised when execution exceeds the configured timeout.""" + + +def _error_response(code: str, message: str) -> Dict[str, str]: + """Build a standardized error response dict.""" + return {"errorCode": code, "errorMessage": message} + + +class BaseAdapter(abc.ABC): + """Base adapter for evaluation framework integrations. + + Subclasses only need to implement execute(fields) which runs the actual + evaluation logic and returns (score, label, explanation). + + Never raises unhandled exceptions — always returns a valid response dict. + """ + + DEFAULT_TIMEOUT = 290 + + def __init__( + self, + field_mapper: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None, + timeout: Optional[int] = None, + ): + """Initialize the adapter. + + Args: + field_mapper: Optional callable that receives the raw Lambda event and + returns a dict of field values. Bypasses default span extraction. + timeout: Maximum seconds to allow for execute(). Defaults to 290 + (slightly under Lambda's 300s max). + """ + self.field_mapper = field_mapper + self.timeout = timeout if timeout is not None else self.DEFAULT_TIMEOUT + + def __call__(self, event: Union[Dict[str, Any], EvaluatorInput], context: Any = None) -> Dict[str, Any]: + """Handle a Lambda invocation. + + Args: + event: Either a raw Lambda event dict or an EvaluatorInput instance + from bedrock_agentcore.evaluation.custom_code_based_evaluators.models. + context: Lambda context object (unused). + + Returns: + Success: {"value": float, "label": str, "explanation": str} + Error: {"errorCode": str, "errorMessage": str} + """ + try: + if isinstance(event, EvaluatorInput): + parsed = ParsedEvaluationEvent( + evaluation_level=event.evaluation_level, + session_spans=event.session_spans, + target_trace_id=event.target_trace_id, + target_span_id=event.target_span_id, + reference_inputs=getattr(event, "reference_inputs", []) or [], + ) + else: + parsed = ParsedEvaluationEvent.from_lambda_event(event) + except (KeyError, IndexError, TypeError) as e: + logger.error("Failed to parse evaluation event: %s", e) + return _error_response("INVALID_EVENT", f"Failed to parse evaluation event: {e}") + + try: + fields = self._extract_fields(parsed) + except ValueError as e: + logger.error("Missing required fields: %s", e) + return _error_response("MISSING_REQUIRED_FIELD", str(e)) + + try: + result = self._execute_with_timeout(fields) + except _ExecutionTimeout: + return _error_response( + "METRIC_TIMEOUT", + f"{type(self).__name__} exceeded {self.timeout}s timeout.", + ) + except Exception as e: + logger.error("Execution failed: %s", e, exc_info=True) + return _error_response("METRIC_ERROR", f"{type(self).__name__} failed: {e}") + + return result + + def _extract_fields(self, parsed: ParsedEvaluationEvent) -> Dict[str, Any]: + """Extract fields from event, using field_mapper if provided.""" + if self.field_mapper is not None: + raw_event = { + "evaluationLevel": parsed.evaluation_level, + "evaluationInput": {"sessionSpans": parsed.session_spans}, + "evaluationTarget": { + "traceIds": [parsed.target_trace_id] if parsed.target_trace_id else [], + "spanIds": [parsed.target_span_id] if parsed.target_span_id else [], + }, + "evaluationReferenceInputs": parsed.reference_inputs, + } + return self.field_mapper(raw_event) + + fields = extract_fields_from_spans(parsed) + self.validate_fields(fields) + return fields + + def validate_fields(self, fields: Dict[str, Any]) -> None: + """Validate that required fields are present. + + Override in subclasses to enforce field requirements. + Default implementation does nothing. + """ + + @abc.abstractmethod + def execute(self, fields: Dict[str, Any]) -> Dict[str, Any]: + """Run the evaluation and return the response dict. + + Args: + fields: Extracted field dict with keys like "input", "actual_output", etc. + + Returns: + {"value": float, "label": str, "explanation": str} + """ + + def _execute_with_timeout(self, fields: Dict[str, Any]) -> Dict[str, Any]: + """Run execute() with a thread-based timeout.""" + if self.timeout <= 0: + return self.execute(fields) + + result_holder: list = [] + exception_holder: list = [] + + def target(): + try: + result_holder.append(self.execute(fields)) + except Exception as e: + exception_holder.append(e) + + thread = threading.Thread(target=target, daemon=True) + thread.start() + thread.join(timeout=self.timeout) + + if thread.is_alive(): + raise _ExecutionTimeout() + + if exception_holder: + raise exception_holder[0] + + return result_holder[0] diff --git a/src/bedrock_agentcore/evaluation/integrations/deepeval/__init__.py b/src/bedrock_agentcore/evaluation/integrations/deepeval/__init__.py new file mode 100644 index 00000000..adb6ba44 --- /dev/null +++ b/src/bedrock_agentcore/evaluation/integrations/deepeval/__init__.py @@ -0,0 +1,5 @@ +"""DeepEval integration for AgentCore Evaluation.""" + +from bedrock_agentcore.evaluation.integrations.deepeval.adapter import DeepEvalAdapter, DeepEvalHandler + +__all__ = ["DeepEvalAdapter", "DeepEvalHandler"] diff --git a/src/bedrock_agentcore/evaluation/integrations/deepeval/adapter.py b/src/bedrock_agentcore/evaluation/integrations/deepeval/adapter.py new file mode 100644 index 00000000..e8748782 --- /dev/null +++ b/src/bedrock_agentcore/evaluation/integrations/deepeval/adapter.py @@ -0,0 +1,189 @@ +"""DeepEval adapter for AgentCore evaluation integrations.""" + +import logging +from typing import Any, Callable, Dict, List, Optional + +from deepeval.metrics import BaseMetric +from deepeval.test_case import LLMTestCase, SingleTurnParams + +from bedrock_agentcore.evaluation.integrations.base import ( + BaseAdapter, + ParsedEvaluationEvent, + extract_fields_from_spans, +) + +logger = logging.getLogger(__name__) + +_PARAM_TO_FIELD: Dict[SingleTurnParams, str] = { + SingleTurnParams.INPUT: "input", + SingleTurnParams.ACTUAL_OUTPUT: "actual_output", + SingleTurnParams.EXPECTED_OUTPUT: "expected_output", + SingleTurnParams.CONTEXT: "context", + SingleTurnParams.RETRIEVAL_CONTEXT: "retrieval_context", +} + +_METRIC_REQUIRED_PARAMS: Dict[str, List[str]] = { + "AnswerRelevancyMetric": ["input", "actual_output"], + "FaithfulnessMetric": ["input", "actual_output", "retrieval_context"], + "ContextualRelevancyMetric": ["input", "actual_output", "retrieval_context"], + "ContextualPrecisionMetric": ["input", "actual_output", "expected_output", "retrieval_context"], + "ContextualRecallMetric": ["input", "actual_output", "expected_output", "retrieval_context"], + "HallucinationMetric": ["input", "actual_output", "context"], + "BiasMetric": ["input", "actual_output"], + "ToxicityMetric": ["input", "actual_output"], + "GEval": ["input", "actual_output"], + "SummarizationMetric": ["input", "actual_output"], +} + + +def _get_required_params(metric: BaseMetric) -> List[str]: + """Determine which LLMTestCase fields a metric requires. + + Fallback chain: + 1. metric._required_params (DeepEval internal attribute) + 2. Static registry _METRIC_REQUIRED_PARAMS keyed by class name + 3. metric.evaluation_params (GEval special case) + 4. Default: ["input", "actual_output"] + """ + if hasattr(metric, "_required_params") and metric._required_params: + params = metric._required_params + if all(p in _PARAM_TO_FIELD for p in params): + return [_PARAM_TO_FIELD[p] for p in params] + + class_name = type(metric).__name__ + if class_name in _METRIC_REQUIRED_PARAMS: + return _METRIC_REQUIRED_PARAMS[class_name] + + if hasattr(metric, "evaluation_params") and metric.evaluation_params: + params = metric.evaluation_params + return [_PARAM_TO_FIELD.get(p, str(p).lower()) for p in params] + + return ["input", "actual_output"] + + +class DeepEvalAdapter(BaseAdapter): + """Adapter that runs a DeepEval metric against AgentCore evaluation events. + + Example:: + + from deepeval.metrics import AnswerRelevancyMetric + + metric = AnswerRelevancyMetric(threshold=0.7) + handler = DeepEvalAdapter(metric=metric) + + # Use as Lambda handler + def lambda_handler(event, context): + return handler(event, context) + """ + + def __init__( + self, + metric: BaseMetric, + field_mapper: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None, + model: Optional[Any] = None, + timeout: Optional[int] = None, + ): + """Initialize the adapter. + + Args: + metric: A DeepEval BaseMetric instance (e.g. AnswerRelevancyMetric). + field_mapper: Optional callable that receives the raw Lambda event and + returns a dict of LLMTestCase field values. Bypasses default span + extraction when provided. + model: Optional model override for the metric's LLM. Can be a string + model ID (e.g. "bedrock/anthropic.claude-3") or a DeepEvalBaseLLM + subclass instance. + timeout: Maximum seconds to allow for metric.measure(). Defaults to 290 + (slightly under Lambda's 300s max). + """ + super().__init__(field_mapper=field_mapper, timeout=timeout) + self.metric = metric + if model is not None: + self.metric.model = model + + def validate_fields(self, fields: Dict[str, Any]) -> None: + """Validate that fields required by the metric are present.""" + required = _get_required_params(self.metric) + missing = [f for f in required if f not in fields or not fields[f]] + if missing: + metric_name = type(self.metric).__name__ + raise ValueError( + f"Field(s) {missing} required by {metric_name} but not found in evaluation event. " + f"Provide a field_mapper or ensure spans contain the necessary data." + ) + + def execute(self, fields: Dict[str, Any]) -> Dict[str, Any]: + """Run the DeepEval metric and return formatted results.""" + test_case = LLMTestCase( + input=fields.get("input", ""), + actual_output=fields.get("actual_output", ""), + expected_output=fields.get("expected_output"), + context=fields.get("context"), + retrieval_context=fields.get("retrieval_context"), + ) + + self.metric.measure(test_case) + + score = self.metric.score + reason = getattr(self.metric, "reason", None) or "" + threshold = getattr(self.metric, "threshold", 0.5) + success = getattr(self.metric, "success", score is not None and score >= threshold) + label = "Pass" if success else "Fail" + + return {"value": score, "label": label, "explanation": reason} + + +def build_test_case( + parsed: ParsedEvaluationEvent, + metric: BaseMetric, + field_mapper: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None, +) -> LLMTestCase: + """Build a DeepEval LLMTestCase from a parsed evaluation event. + + Args: + parsed: The parsed Lambda event. + metric: The DeepEval metric (used to determine required fields). + field_mapper: Optional callable that receives the raw Lambda event fields + and returns a dict of LLMTestCase field values. Bypasses default + span extraction when provided. + + Returns: + An LLMTestCase ready for metric.measure(). + + Raises: + ValueError: If required fields for the metric cannot be populated. + """ + if field_mapper is not None: + raw_event = { + "evaluationLevel": parsed.evaluation_level, + "evaluationInput": {"sessionSpans": parsed.session_spans}, + "evaluationTarget": { + "traceIds": [parsed.target_trace_id] if parsed.target_trace_id else [], + "spanIds": [parsed.target_span_id] if parsed.target_span_id else [], + }, + "evaluationReferenceInputs": parsed.reference_inputs, + } + fields = field_mapper(raw_event) + else: + fields = extract_fields_from_spans(parsed) + + required = _get_required_params(metric) + missing = [f for f in required if f not in fields or not fields[f]] + if missing: + metric_name = type(metric).__name__ + raise ValueError( + f"Field(s) {missing} required by {metric_name} but not found in evaluation event. " + f"Provide a field_mapper or ensure spans contain the necessary data." + ) + + return LLMTestCase( + input=fields.get("input", ""), + actual_output=fields.get("actual_output", ""), + expected_output=fields.get("expected_output"), + context=fields.get("context"), + retrieval_context=fields.get("retrieval_context"), + ) + + +# Backward-compatible alias +DeepEvalHandler = DeepEvalAdapter diff --git a/tests/bedrock_agentcore/evaluation/integrations/autoevals/__init__.py b/tests/bedrock_agentcore/evaluation/integrations/autoevals/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/bedrock_agentcore/evaluation/integrations/autoevals/test_adapter.py b/tests/bedrock_agentcore/evaluation/integrations/autoevals/test_adapter.py new file mode 100644 index 00000000..17f674bd --- /dev/null +++ b/tests/bedrock_agentcore/evaluation/integrations/autoevals/test_adapter.py @@ -0,0 +1,217 @@ +"""Tests for AutoevalsAdapter.""" + +import json +import time +from unittest.mock import MagicMock + +import pytest + +from bedrock_agentcore.evaluation.integrations.autoevals.adapter import AutoevalsAdapter + + +def _make_event( + level="TRACE", + trace_ids=None, + spans=None, + reference_inputs=None, +): + """Build a raw Lambda event dict for testing.""" + if spans is None: + log_records = [ + { + "body": { + "input": {"messages": [{"role": "user", "content": "What is AI?"}]}, + "output": {"messages": [{"role": "assistant", "content": "AI is artificial intelligence."}]}, + } + } + ] + spans = [ + { + "traceId": "abc123", + "spanId": "span1", + "attributes": {"_eval_log_records": json.dumps(log_records)}, + } + ] + + event = { + "schemaVersion": "1.0", + "evaluationLevel": level, + "evaluationInput": {"sessionSpans": spans}, + "evaluationTarget": {}, + } + if trace_ids is not None: + event["evaluationTarget"]["traceIds"] = trace_ids + if reference_inputs is not None: + event["evaluationReferenceInputs"] = reference_inputs + return event + + +def _mock_scorer(score=0.9, rationale="Good answer"): + """Create a mock Autoevals scorer.""" + scorer = MagicMock() + type(scorer).__name__ = "MockScorer" + + result = MagicMock() + result.score = score + result.metadata = {"rationale": rationale} + + scorer.eval = MagicMock(return_value=result) + return scorer + + +class TestAutoevalsAdapterSuccess: + def test_returns_pass_when_score_above_half(self): + scorer = _mock_scorer(score=0.8) + adapter = AutoevalsAdapter(scorer=scorer) + + result = adapter(_make_event()) + + assert result["value"] == 0.8 + assert result["label"] == "Pass" + assert result["explanation"] == "Good answer" + + def test_returns_fail_when_score_below_half(self): + scorer = _mock_scorer(score=0.3) + adapter = AutoevalsAdapter(scorer=scorer) + + result = adapter(_make_event()) + + assert result["value"] == 0.3 + assert result["label"] == "Fail" + + def test_scorer_eval_called_with_input_and_output(self): + scorer = _mock_scorer() + adapter = AutoevalsAdapter(scorer=scorer) + + adapter(_make_event()) + + scorer.eval.assert_called_once() + call_kwargs = scorer.eval.call_args[1] + assert call_kwargs["input"] == "What is AI?" + assert call_kwargs["output"] == "AI is artificial intelligence." + + def test_expected_output_passed_as_expected(self): + scorer = _mock_scorer() + adapter = AutoevalsAdapter(scorer=scorer) + + refs = [{"expectedResponse": "AI stands for artificial intelligence."}] + result = adapter(_make_event(reference_inputs=refs)) + + call_kwargs = scorer.eval.call_args[1] + assert call_kwargs["expected"] == "AI stands for artificial intelligence." + + def test_no_expected_output_omits_expected_kwarg(self): + scorer = _mock_scorer() + adapter = AutoevalsAdapter(scorer=scorer) + + adapter(_make_event()) + + call_kwargs = scorer.eval.call_args[1] + assert "expected" not in call_kwargs + + def test_custom_field_mapper(self): + scorer = _mock_scorer() + adapter = AutoevalsAdapter( + scorer=scorer, + field_mapper=lambda event: { + "input": "custom input", + "actual_output": "custom output", + }, + ) + + result = adapter(_make_event()) + + call_kwargs = scorer.eval.call_args[1] + assert call_kwargs["input"] == "custom input" + assert call_kwargs["output"] == "custom output" + + +class TestAutoevalsAdapterErrors: + def test_invalid_event_returns_error(self): + scorer = _mock_scorer() + adapter = AutoevalsAdapter(scorer=scorer) + + result = adapter({}) + + assert result["errorCode"] == "INVALID_EVENT" + + def test_missing_input_returns_error(self): + log_records = [ + { + "body": { + "output": {"messages": [{"role": "assistant", "content": "answer"}]}, + } + } + ] + spans = [ + { + "traceId": "t1", + "spanId": "s1", + "attributes": {"_eval_log_records": json.dumps(log_records)}, + } + ] + scorer = _mock_scorer() + adapter = AutoevalsAdapter(scorer=scorer) + + result = adapter(_make_event(spans=spans)) + + assert result["errorCode"] == "MISSING_REQUIRED_FIELD" + assert "input" in result["errorMessage"] + + def test_scorer_exception_returns_error(self): + scorer = _mock_scorer() + scorer.eval = MagicMock(side_effect=RuntimeError("API error")) + adapter = AutoevalsAdapter(scorer=scorer) + + result = adapter(_make_event()) + + assert result["errorCode"] == "METRIC_ERROR" + assert "API error" in result["errorMessage"] + + def test_never_raises_on_bad_input(self): + scorer = _mock_scorer() + adapter = AutoevalsAdapter(scorer=scorer) + + for bad_input in [None, [], "string", 42]: + result = adapter(bad_input) + assert "errorCode" in result + + +class TestAutoevalsAdapterTimeout: + def test_timeout_returns_error(self): + scorer = _mock_scorer() + scorer.eval = MagicMock(side_effect=lambda **kw: time.sleep(5)) + adapter = AutoevalsAdapter(scorer=scorer, timeout=1) + + result = adapter(_make_event()) + + assert result["errorCode"] == "METRIC_TIMEOUT" + + def test_default_timeout_is_290(self): + scorer = _mock_scorer() + adapter = AutoevalsAdapter(scorer=scorer) + + assert adapter.timeout == 290 + + +class TestAutoevalsAdapterEdgeCases: + def test_score_none_returns_fail(self): + scorer = _mock_scorer(score=None) + adapter = AutoevalsAdapter(scorer=scorer) + + result = adapter(_make_event()) + + assert result["label"] == "Fail" + + def test_no_metadata_returns_empty_explanation(self): + scorer = MagicMock() + type(scorer).__name__ = "MockScorer" + result_obj = MagicMock(spec=[]) + result_obj.score = 0.9 + scorer.eval = MagicMock(return_value=result_obj) + + adapter = AutoevalsAdapter(scorer=scorer) + + result = adapter(_make_event()) + + assert result["explanation"] == "" diff --git a/tests/bedrock_agentcore/evaluation/integrations/deepeval/__init__.py b/tests/bedrock_agentcore/evaluation/integrations/deepeval/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/bedrock_agentcore/evaluation/integrations/deepeval/test_handler.py b/tests/bedrock_agentcore/evaluation/integrations/deepeval/test_handler.py new file mode 100644 index 00000000..67bfda3d --- /dev/null +++ b/tests/bedrock_agentcore/evaluation/integrations/deepeval/test_handler.py @@ -0,0 +1,427 @@ +"""Tests for DeepEvalHandler and DeepEvalAdapter.""" + +import json +import time +from unittest.mock import MagicMock, patch + +import pytest + +from bedrock_agentcore.evaluation.integrations.deepeval.adapter import DeepEvalAdapter, DeepEvalHandler +from bedrock_agentcore.evaluation.integrations.base import BaseAdapter +from bedrock_agentcore.evaluation.custom_code_based_evaluators.models import EvaluatorInput + + +def _make_event( + level="TRACE", + trace_ids=None, + spans=None, + reference_inputs=None, +): + """Build a raw Lambda event dict for testing.""" + if spans is None: + log_records = [ + { + "body": { + "input": {"messages": [{"role": "user", "content": "What is AI?"}]}, + "output": {"messages": [{"role": "assistant", "content": "AI is artificial intelligence."}]}, + } + } + ] + spans = [ + { + "traceId": "abc123", + "spanId": "span1", + "attributes": {"_eval_log_records": json.dumps(log_records)}, + } + ] + + event = { + "schemaVersion": "1.0", + "evaluationLevel": level, + "evaluationInput": {"sessionSpans": spans}, + "evaluationTarget": {}, + } + if trace_ids is not None: + event["evaluationTarget"]["traceIds"] = trace_ids + if reference_inputs is not None: + event["evaluationReferenceInputs"] = reference_inputs + return event + + +def _mock_metric(score=0.85, reason="Looks good", threshold=0.7, name="MockMetric"): + """Create a mock metric that returns a fixed score on measure().""" + metric = MagicMock() + type(metric).__name__ = name + metric.threshold = threshold + metric.score = score + metric.reason = reason + metric._required_params = None + del metric._required_params + del metric.evaluation_params + del metric.success + + def measure_side_effect(test_case): + metric.score = score + metric.reason = reason + + metric.measure = MagicMock(side_effect=measure_side_effect) + return metric + + +class TestDeepEvalHandlerSuccess: + def test_returns_pass_when_score_above_threshold(self): + metric = _mock_metric(score=0.9, threshold=0.7) + handler = DeepEvalHandler(metric=metric) + + result = handler(_make_event()) + + assert result["value"] == 0.9 + assert result["label"] == "Pass" + assert result["explanation"] == "Looks good" + + def test_returns_fail_when_score_below_threshold(self): + metric = _mock_metric(score=0.3, threshold=0.7) + handler = DeepEvalHandler(metric=metric) + + result = handler(_make_event()) + + assert result["value"] == 0.3 + assert result["label"] == "Fail" + + def test_returns_pass_at_exact_threshold(self): + metric = _mock_metric(score=0.7, threshold=0.7) + handler = DeepEvalHandler(metric=metric) + + result = handler(_make_event()) + + assert result["label"] == "Pass" + + def test_metric_measure_called_with_test_case(self): + metric = _mock_metric() + handler = DeepEvalHandler(metric=metric) + + handler(_make_event()) + + metric.measure.assert_called_once() + test_case = metric.measure.call_args[0][0] + assert test_case.input == "What is AI?" + assert test_case.actual_output == "AI is artificial intelligence." + + def test_context_parameter_ignored(self): + metric = _mock_metric() + handler = DeepEvalHandler(metric=metric) + mock_context = {"function_name": "my-lambda"} + + result = handler(_make_event(), mock_context) + + assert result["value"] == 0.85 + + def test_custom_field_mapper(self): + metric = _mock_metric() + handler = DeepEvalHandler( + metric=metric, + field_mapper=lambda event: { + "input": "mapped input", + "actual_output": "mapped output", + }, + ) + + result = handler(_make_event()) + + assert result["value"] == 0.85 + test_case = metric.measure.call_args[0][0] + assert test_case.input == "mapped input" + assert test_case.actual_output == "mapped output" + + +class TestDeepEvalHandlerErrors: + def test_invalid_event_returns_error(self): + metric = _mock_metric() + handler = DeepEvalHandler(metric=metric) + + result = handler({}) + + assert result["errorCode"] == "INVALID_EVENT" + assert "errorMessage" in result + assert "value" not in result + + def test_missing_evaluation_input_returns_error(self): + metric = _mock_metric() + handler = DeepEvalHandler(metric=metric) + + event = {"evaluationLevel": "TRACE", "evaluationTarget": {}} + result = handler(event) + + assert result["errorCode"] == "INVALID_EVENT" + + def test_missing_required_field_returns_error(self): + log_records = [ + { + "body": { + "input": {"messages": [{"role": "user", "content": "q"}]}, + "output": {"messages": [{"role": "assistant", "content": "a"}]}, + } + } + ] + spans = [ + { + "traceId": "t1", + "spanId": "s1", + "attributes": {"_eval_log_records": json.dumps(log_records)}, + } + ] + metric = _mock_metric(name="FaithfulnessMetric") + handler = DeepEvalHandler(metric=metric) + + event = _make_event(spans=spans) + result = handler(event) + + assert result["errorCode"] == "MISSING_REQUIRED_FIELD" + assert "retrieval_context" in result["errorMessage"] + + def test_metric_measure_exception_returns_error(self): + metric = _mock_metric() + metric.measure = MagicMock(side_effect=RuntimeError("LLM timeout")) + handler = DeepEvalHandler(metric=metric) + + result = handler(_make_event()) + + assert result["errorCode"] == "METRIC_ERROR" + assert "LLM timeout" in result["errorMessage"] + + def test_never_raises_on_any_input(self): + metric = _mock_metric() + handler = DeepEvalHandler(metric=metric) + + for bad_input in [None, [], "string", 42, {"random": "keys"}]: + result = handler(bad_input) + assert "errorCode" in result or "value" in result + + +class TestDeepEvalHandlerEdgeCases: + def test_metric_with_no_reason(self): + metric = _mock_metric(score=0.8, reason=None) + handler = DeepEvalHandler(metric=metric) + + result = handler(_make_event()) + + assert result["explanation"] == "" + + def test_metric_score_zero(self): + metric = _mock_metric(score=0.0, threshold=0.5) + handler = DeepEvalHandler(metric=metric) + + result = handler(_make_event()) + + assert result["value"] == 0.0 + assert result["label"] == "Fail" + + def test_metric_score_one(self): + metric = _mock_metric(score=1.0, threshold=0.5) + handler = DeepEvalHandler(metric=metric) + + result = handler(_make_event()) + + assert result["value"] == 1.0 + assert result["label"] == "Pass" + + def test_default_threshold_when_missing(self): + metric = _mock_metric(score=0.6) + del metric.threshold + handler = DeepEvalHandler(metric=metric) + + result = handler(_make_event()) + + assert result["label"] == "Pass" + + def test_label_uses_metric_success_true(self): + metric = _mock_metric(score=0.3, threshold=0.7) + metric.success = True + handler = DeepEvalHandler(metric=metric) + + result = handler(_make_event()) + + assert result["value"] == 0.3 + assert result["label"] == "Pass" + + def test_label_uses_metric_success_false(self): + metric = _mock_metric(score=0.9, threshold=0.7) + metric.success = False + handler = DeepEvalHandler(metric=metric) + + result = handler(_make_event()) + + assert result["value"] == 0.9 + assert result["label"] == "Fail" + + def test_label_falls_back_to_threshold_when_no_success(self): + metric = _mock_metric(score=0.8, threshold=0.7) + handler = DeepEvalHandler(metric=metric) + + result = handler(_make_event()) + + assert result["label"] == "Pass" + + def test_model_override_sets_metric_model(self): + metric = _mock_metric() + handler = DeepEvalHandler(metric=metric, model="bedrock/anthropic.claude-3") + + assert metric.model == "bedrock/anthropic.claude-3" + + def test_no_model_override_leaves_metric_unchanged(self): + metric = _mock_metric() + metric.model = "original-model" + handler = DeepEvalHandler(metric=metric) + + handler(_make_event()) + + assert metric.model == "original-model" + + +class TestDeepEvalHandlerTimeout: + def test_timeout_returns_error(self): + metric = _mock_metric() + metric.measure = MagicMock(side_effect=lambda tc: time.sleep(5)) + handler = DeepEvalHandler(metric=metric, timeout=1) + + result = handler(_make_event()) + + assert result["errorCode"] == "METRIC_TIMEOUT" + assert "1s timeout" in result["errorMessage"] + + def test_no_timeout_when_measure_completes_in_time(self): + metric = _mock_metric() + handler = DeepEvalHandler(metric=metric, timeout=10) + + result = handler(_make_event()) + + assert result["value"] == 0.85 + assert "errorCode" not in result + + def test_default_timeout_is_290(self): + metric = _mock_metric() + handler = DeepEvalHandler(metric=metric) + + assert handler.timeout == 290 + + def test_custom_timeout_value(self): + metric = _mock_metric() + handler = DeepEvalHandler(metric=metric, timeout=60) + + assert handler.timeout == 60 + + def test_metric_exception_still_propagates_with_timeout(self): + metric = _mock_metric() + metric.measure = MagicMock(side_effect=RuntimeError("LLM error")) + handler = DeepEvalHandler(metric=metric, timeout=10) + + result = handler(_make_event()) + + assert result["errorCode"] == "METRIC_ERROR" + assert "LLM error" in result["errorMessage"] + + +class TestBackwardCompatibility: + def test_handler_is_alias_for_adapter(self): + assert DeepEvalHandler is DeepEvalAdapter + + def test_adapter_is_subclass_of_base(self): + assert issubclass(DeepEvalAdapter, BaseAdapter) + + def test_import_from_init(self): + from bedrock_agentcore.evaluation.integrations.deepeval import DeepEvalHandler as H + from bedrock_agentcore.evaluation.integrations.deepeval import DeepEvalAdapter as A + + assert H is A + + def test_handler_works_same_as_before(self): + metric = _mock_metric(score=0.9, threshold=0.7) + handler = DeepEvalHandler(metric=metric) + + result = handler(_make_event()) + + assert result["value"] == 0.9 + assert result["label"] == "Pass" + + +class TestEvaluatorInputAcceptance: + def _make_evaluator_input(self): + log_records = [ + { + "body": { + "input": {"messages": [{"role": "user", "content": "Hello"}]}, + "output": {"messages": [{"role": "assistant", "content": "Hi there"}]}, + } + } + ] + spans = [ + { + "traceId": "t1", + "spanId": "s1", + "attributes": {"_eval_log_records": json.dumps(log_records)}, + } + ] + return EvaluatorInput( + evaluation_level="TRACE", + session_spans=spans, + target_trace_id="t1", + target_span_id=None, + ) + + def test_accepts_evaluator_input(self): + metric = _mock_metric(score=0.95) + handler = DeepEvalHandler(metric=metric) + + result = handler(self._make_evaluator_input()) + + assert result["value"] == 0.95 + assert result["label"] == "Pass" + + def test_evaluator_input_extracts_fields_correctly(self): + metric = _mock_metric() + handler = DeepEvalHandler(metric=metric) + + handler(self._make_evaluator_input()) + + test_case = metric.measure.call_args[0][0] + assert test_case.input == "Hello" + assert test_case.actual_output == "Hi there" + + def test_evaluator_input_with_trace_id_filtering(self): + log_records = [ + { + "traceId": "target", + "body": { + "input": {"messages": [{"role": "user", "content": "relevant"}]}, + "output": {"messages": [{"role": "assistant", "content": "yes"}]}, + }, + }, + { + "traceId": "other", + "body": { + "input": {"messages": [{"role": "user", "content": "irrelevant"}]}, + "output": {"messages": [{"role": "assistant", "content": "no"}]}, + }, + }, + ] + spans = [ + { + "traceId": "t1", + "spanId": "s1", + "attributes": {"_eval_log_records": json.dumps(log_records)}, + } + ] + evaluator_input = EvaluatorInput( + evaluation_level="TRACE", + session_spans=spans, + target_trace_id="target", + ) + + metric = _mock_metric() + handler = DeepEvalHandler(metric=metric) + + handler(evaluator_input) + + test_case = metric.measure.call_args[0][0] + assert test_case.input == "relevant" + assert test_case.actual_output == "yes" diff --git a/tests/bedrock_agentcore/evaluation/integrations/deepeval/test_input_mapper.py b/tests/bedrock_agentcore/evaluation/integrations/deepeval/test_input_mapper.py new file mode 100644 index 00000000..2d6fbaea --- /dev/null +++ b/tests/bedrock_agentcore/evaluation/integrations/deepeval/test_input_mapper.py @@ -0,0 +1,581 @@ +"""Tests for deepeval input mapping and test case building.""" + +import json +from unittest.mock import MagicMock + +import pytest +from deepeval.test_case import SingleTurnParams + +from bedrock_agentcore.evaluation.integrations.base import ( + ParsedEvaluationEvent, + extract_fields_from_spans as _extract_fields_from_spans, +) +from bedrock_agentcore.evaluation.integrations.deepeval.adapter import ( + _get_required_params, + build_test_case, +) + + +def _make_log_record( + input_messages=None, + output_messages=None, + trace_id=None, +): + """Build a single log record dict.""" + record = {"body": {}} + if input_messages is not None: + record["body"]["input"] = {"messages": input_messages} + if output_messages is not None: + record["body"]["output"] = {"messages": output_messages} + if trace_id is not None: + record["traceId"] = trace_id + return record + + +def _make_span_with_log_records(log_records, span_id="span1", as_json_string=True): + """Build a span dict with _eval_log_records in attributes.""" + value = json.dumps(log_records) if as_json_string else log_records + return { + "traceId": "abc123", + "spanId": span_id, + "attributes": {"_eval_log_records": value}, + } + + +def _make_event( + level="TRACE", + trace_ids=None, + span_ids=None, + spans=None, + reference_inputs=None, +): + """Build a raw Lambda event dict for testing.""" + if spans is None: + log_records = [ + _make_log_record( + input_messages=[{"role": "user", "content": "What is the capital of France?"}], + output_messages=[{"role": "assistant", "content": "The capital of France is Paris."}], + ) + ] + spans = [_make_span_with_log_records(log_records)] + + event = { + "schemaVersion": "1.0", + "evaluationLevel": level, + "evaluationInput": {"sessionSpans": spans}, + "evaluationTarget": {}, + } + if trace_ids is not None: + event["evaluationTarget"]["traceIds"] = trace_ids + if span_ids is not None: + event["evaluationTarget"]["spanIds"] = span_ids + if reference_inputs is not None: + event["evaluationReferenceInputs"] = reference_inputs + return event + + +def _mock_metric(name="MockMetric", required_params=None, evaluation_params=None, threshold=0.5): + """Create a mock DeepEval metric.""" + metric = MagicMock() + type(metric).__name__ = name + metric.threshold = threshold + + if required_params is not None: + metric._required_params = required_params + else: + del metric._required_params + + if evaluation_params is not None: + metric.evaluation_params = evaluation_params + else: + del metric.evaluation_params + + return metric + + +class TestParsedEvaluationEvent: + def test_from_lambda_event_trace_level(self): + event = _make_event(level="TRACE", trace_ids=["trace-1"]) + parsed = ParsedEvaluationEvent.from_lambda_event(event) + + assert parsed.evaluation_level == "TRACE" + assert parsed.target_trace_id == "trace-1" + assert parsed.target_span_id is None + assert len(parsed.session_spans) == 1 + + def test_from_lambda_event_tool_call_level(self): + event = _make_event(level="TOOL_CALL", span_ids=["span-42"]) + parsed = ParsedEvaluationEvent.from_lambda_event(event) + + assert parsed.evaluation_level == "TOOL_CALL" + assert parsed.target_span_id == "span-42" + assert parsed.target_trace_id is None + + def test_from_lambda_event_session_level(self): + event = _make_event(level="SESSION") + parsed = ParsedEvaluationEvent.from_lambda_event(event) + + assert parsed.evaluation_level == "SESSION" + assert parsed.target_trace_id is None + assert parsed.target_span_id is None + + def test_from_lambda_event_with_reference_inputs(self): + refs = [{"expectedResponse": "Paris is the capital of France."}] + event = _make_event(reference_inputs=refs) + parsed = ParsedEvaluationEvent.from_lambda_event(event) + + assert parsed.reference_inputs == refs + + def test_from_lambda_event_missing_reference_inputs(self): + event = _make_event() + parsed = ParsedEvaluationEvent.from_lambda_event(event) + + assert parsed.reference_inputs == [] + + def test_from_lambda_event_missing_evaluation_level_raises(self): + event = _make_event() + del event["evaluationLevel"] + + with pytest.raises(KeyError): + ParsedEvaluationEvent.from_lambda_event(event) + + def test_from_lambda_event_missing_evaluation_input_raises(self): + event = _make_event() + del event["evaluationInput"] + + with pytest.raises(KeyError): + ParsedEvaluationEvent.from_lambda_event(event) + + def test_from_lambda_event_missing_target_key_defaults(self): + event = _make_event() + del event["evaluationTarget"] + parsed = ParsedEvaluationEvent.from_lambda_event(event) + + assert parsed.target_trace_id is None + assert parsed.target_span_id is None + + +class TestGetRequiredParams: + def test_uses_required_params_attribute(self): + metric = _mock_metric( + required_params=[SingleTurnParams.INPUT, SingleTurnParams.ACTUAL_OUTPUT] + ) + result = _get_required_params(metric) + + assert result == ["input", "actual_output"] + + def test_falls_back_to_static_registry(self): + metric = _mock_metric(name="FaithfulnessMetric") + result = _get_required_params(metric) + + assert result == ["input", "actual_output", "retrieval_context"] + + def test_falls_back_to_evaluation_params(self): + metric = _mock_metric( + name="UnknownMetric", + evaluation_params=[SingleTurnParams.INPUT, SingleTurnParams.RETRIEVAL_CONTEXT], + ) + result = _get_required_params(metric) + + assert result == ["input", "retrieval_context"] + + def test_defaults_to_input_and_actual_output(self): + metric = _mock_metric(name="UnknownMetric") + result = _get_required_params(metric) + + assert result == ["input", "actual_output"] + + def test_unmappable_required_params_skips_to_static_registry(self): + metric = _mock_metric(name="GEval", required_params=["SomeTypingObject", "AnotherType"]) + result = _get_required_params(metric) + + assert result == ["input", "actual_output"] + + def test_unmappable_required_params_falls_to_default(self): + metric = _mock_metric(name="UnknownMetric", required_params=["SomeTypingObject"]) + result = _get_required_params(metric) + + assert result == ["input", "actual_output"] + + def test_empty_required_params_falls_through(self): + metric = _mock_metric(name="UnknownMetric", required_params=[]) + result = _get_required_params(metric) + + assert result == ["input", "actual_output"] + + +class TestExtractFieldsFromSpans: + def test_basic_extraction(self): + log_records = [ + _make_log_record( + input_messages=[{"role": "user", "content": "hello"}], + output_messages=[{"role": "assistant", "content": "world"}], + ) + ] + spans = [_make_span_with_log_records(log_records)] + parsed = ParsedEvaluationEvent( + evaluation_level="TRACE", session_spans=spans + ) + + fields = _extract_fields_from_spans(parsed) + + assert fields["input"] == "hello" + assert fields["actual_output"] == "world" + + def test_tool_messages_become_retrieval_context(self): + log_records = [ + _make_log_record( + input_messages=[{"role": "user", "content": "query"}], + output_messages=[ + {"role": "tool", "content": "doc chunk 1"}, + {"role": "tool", "content": "doc chunk 2"}, + {"role": "assistant", "content": "answer"}, + ], + ) + ] + spans = [_make_span_with_log_records(log_records)] + parsed = ParsedEvaluationEvent( + evaluation_level="TRACE", session_spans=spans + ) + + fields = _extract_fields_from_spans(parsed) + + assert fields["retrieval_context"] == ["doc chunk 1", "doc chunk 2"] + assert fields["actual_output"] == "answer" + + def test_tool_messages_also_set_context_for_hallucination_metric(self): + log_records = [ + _make_log_record( + input_messages=[{"role": "user", "content": "query"}], + output_messages=[ + {"role": "tool", "content": "context chunk"}, + {"role": "assistant", "content": "answer"}, + ], + ) + ] + spans = [_make_span_with_log_records(log_records)] + parsed = ParsedEvaluationEvent( + evaluation_level="TRACE", session_spans=spans + ) + + fields = _extract_fields_from_spans(parsed) + + assert fields["context"] == ["context chunk"] + assert fields["context"] == fields["retrieval_context"] + + def test_message_content_as_dict_with_content_key(self): + log_records = [ + _make_log_record( + input_messages=[{"role": "user", "content": {"content": "nested content"}}], + output_messages=[{"role": "assistant", "content": {"content": "nested output"}}], + ) + ] + spans = [_make_span_with_log_records(log_records)] + parsed = ParsedEvaluationEvent( + evaluation_level="TRACE", session_spans=spans + ) + + fields = _extract_fields_from_spans(parsed) + + assert fields["input"] == "nested content" + assert fields["actual_output"] == "nested output" + + def test_message_content_as_dict_with_message_key(self): + log_records = [ + _make_log_record( + input_messages=[{"role": "user", "message": "msg key input"}], + output_messages=[{"role": "assistant", "message": "msg key output"}], + ) + ] + spans = [_make_span_with_log_records(log_records)] + parsed = ParsedEvaluationEvent( + evaluation_level="TRACE", session_spans=spans + ) + + fields = _extract_fields_from_spans(parsed) + + assert fields["input"] == "msg key input" + assert fields["actual_output"] == "msg key output" + + def test_message_content_as_plain_string_in_content_field(self): + log_records = [ + _make_log_record( + input_messages=[{"role": "user", "content": "plain string"}], + output_messages=[{"role": "assistant", "content": "plain response"}], + ) + ] + spans = [_make_span_with_log_records(log_records)] + parsed = ParsedEvaluationEvent( + evaluation_level="TRACE", session_spans=spans + ) + + fields = _extract_fields_from_spans(parsed) + + assert fields["input"] == "plain string" + assert fields["actual_output"] == "plain response" + + def test_target_trace_id_filters_records(self): + log_records = [ + _make_log_record( + input_messages=[{"role": "user", "content": "relevant"}], + output_messages=[{"role": "assistant", "content": "relevant answer"}], + trace_id="target-trace", + ), + _make_log_record( + input_messages=[{"role": "user", "content": "irrelevant"}], + output_messages=[{"role": "assistant", "content": "irrelevant answer"}], + trace_id="other-trace", + ), + ] + spans = [_make_span_with_log_records(log_records)] + parsed = ParsedEvaluationEvent( + evaluation_level="TRACE", + session_spans=spans, + target_trace_id="target-trace", + ) + + fields = _extract_fields_from_spans(parsed) + + assert fields["input"] == "relevant" + assert fields["actual_output"] == "relevant answer" + + def test_no_target_trace_id_includes_all_records(self): + log_records = [ + _make_log_record( + input_messages=[{"role": "user", "content": "first"}], + output_messages=[{"role": "assistant", "content": "first answer"}], + trace_id="trace-1", + ), + _make_log_record( + input_messages=[{"role": "user", "content": "second"}], + output_messages=[{"role": "assistant", "content": "second answer"}], + trace_id="trace-2", + ), + ] + spans = [_make_span_with_log_records(log_records)] + parsed = ParsedEvaluationEvent( + evaluation_level="SESSION", session_spans=spans + ) + + fields = _extract_fields_from_spans(parsed) + + assert fields["input"] == "first\nsecond" + assert fields["actual_output"] == "first answer\nsecond answer" + + def test_log_records_as_parsed_list(self): + log_records = [ + _make_log_record( + input_messages=[{"role": "user", "content": "from list"}], + output_messages=[{"role": "assistant", "content": "from list answer"}], + ) + ] + spans = [_make_span_with_log_records(log_records, as_json_string=False)] + parsed = ParsedEvaluationEvent( + evaluation_level="TRACE", session_spans=spans + ) + + fields = _extract_fields_from_spans(parsed) + + assert fields["input"] == "from list" + assert fields["actual_output"] == "from list answer" + + def test_invalid_json_log_records_skipped(self): + spans = [ + { + "traceId": "t1", + "spanId": "s1", + "attributes": {"_eval_log_records": "not valid json{{{"}, + } + ] + parsed = ParsedEvaluationEvent( + evaluation_level="TRACE", session_spans=spans + ) + + fields = _extract_fields_from_spans(parsed) + + assert fields == {} + + def test_span_without_log_records_skipped(self): + spans = [{"traceId": "t1", "spanId": "s1", "attributes": {}}] + parsed = ParsedEvaluationEvent( + evaluation_level="TRACE", session_spans=spans + ) + + fields = _extract_fields_from_spans(parsed) + + assert fields == {} + + def test_multiple_spans_aggregated(self): + log_records_1 = [ + _make_log_record( + input_messages=[{"role": "user", "content": "q1"}], + output_messages=[{"role": "assistant", "content": "a1"}], + ) + ] + log_records_2 = [ + _make_log_record( + input_messages=[{"role": "user", "content": "q2"}], + output_messages=[{"role": "assistant", "content": "a2"}], + ) + ] + spans = [ + _make_span_with_log_records(log_records_1, span_id="s1"), + _make_span_with_log_records(log_records_2, span_id="s2"), + ] + parsed = ParsedEvaluationEvent( + evaluation_level="SESSION", session_spans=spans + ) + + fields = _extract_fields_from_spans(parsed) + + assert fields["input"] == "q1\nq2" + assert fields["actual_output"] == "a1\na2" + + def test_reference_inputs_expected_output(self): + log_records = [ + _make_log_record( + input_messages=[{"role": "user", "content": "q"}], + output_messages=[{"role": "assistant", "content": "a"}], + ) + ] + spans = [_make_span_with_log_records(log_records)] + parsed = ParsedEvaluationEvent( + evaluation_level="TRACE", + session_spans=spans, + reference_inputs=[{"expectedResponse": "expected answer"}], + ) + + fields = _extract_fields_from_spans(parsed) + + assert fields["expected_output"] == "expected answer" + + def test_record_without_matching_trace_id_key_included(self): + log_records = [ + _make_log_record( + input_messages=[{"role": "user", "content": "no trace id record"}], + output_messages=[{"role": "assistant", "content": "response"}], + ), + ] + spans = [_make_span_with_log_records(log_records)] + parsed = ParsedEvaluationEvent( + evaluation_level="TRACE", + session_spans=spans, + target_trace_id="target-trace", + ) + + fields = _extract_fields_from_spans(parsed) + + assert fields["input"] == "no trace id record" + + +class TestBuildTestCase: + def test_basic_span_extraction(self): + event = _make_event() + parsed = ParsedEvaluationEvent.from_lambda_event(event) + metric = _mock_metric(name="AnswerRelevancyMetric") + + test_case = build_test_case(parsed, metric) + + assert test_case.input == "What is the capital of France?" + assert test_case.actual_output == "The capital of France is Paris." + + def test_retrieval_context_from_tool_messages(self): + log_records = [ + _make_log_record( + input_messages=[{"role": "user", "content": "query"}], + output_messages=[ + {"role": "tool", "content": "doc chunk 1"}, + {"role": "tool", "content": "doc chunk 2"}, + {"role": "assistant", "content": "answer"}, + ], + ) + ] + spans = [_make_span_with_log_records(log_records)] + event = _make_event(spans=spans) + parsed = ParsedEvaluationEvent.from_lambda_event(event) + metric = _mock_metric(name="FaithfulnessMetric") + + test_case = build_test_case(parsed, metric) + + assert test_case.input == "query" + assert test_case.actual_output == "answer" + assert test_case.retrieval_context == ["doc chunk 1", "doc chunk 2"] + + def test_expected_output_from_reference_inputs(self): + refs = [{"expectedResponse": "Paris"}] + event = _make_event(reference_inputs=refs) + parsed = ParsedEvaluationEvent.from_lambda_event(event) + metric = _mock_metric(name="AnswerRelevancyMetric") + + test_case = build_test_case(parsed, metric) + + assert test_case.expected_output == "Paris" + + def test_missing_required_field_raises_value_error(self): + log_records = [ + _make_log_record( + input_messages=[{"role": "user", "content": "query"}], + output_messages=[{"role": "assistant", "content": "answer"}], + ) + ] + spans = [_make_span_with_log_records(log_records)] + event = _make_event(spans=spans) + parsed = ParsedEvaluationEvent.from_lambda_event(event) + metric = _mock_metric(name="FaithfulnessMetric") + + with pytest.raises(ValueError, match="retrieval_context"): + build_test_case(parsed, metric) + + def test_custom_field_mapper_bypasses_extraction(self): + event = _make_event() + parsed = ParsedEvaluationEvent.from_lambda_event(event) + metric = _mock_metric(name="AnswerRelevancyMetric") + + def custom_mapper(raw_event): + return { + "input": "custom input", + "actual_output": "custom output", + } + + test_case = build_test_case(parsed, metric, field_mapper=custom_mapper) + + assert test_case.input == "custom input" + assert test_case.actual_output == "custom output" + + def test_field_mapper_receives_reconstructed_event(self): + refs = [{"expectedResponse": "expected"}] + event = _make_event(level="TRACE", trace_ids=["t1"], reference_inputs=refs) + parsed = ParsedEvaluationEvent.from_lambda_event(event) + metric = _mock_metric(name="AnswerRelevancyMetric") + + received_events = [] + + def capture_mapper(raw_event): + received_events.append(raw_event) + return {"input": "x", "actual_output": "y"} + + build_test_case(parsed, metric, field_mapper=capture_mapper) + + raw = received_events[0] + assert raw["evaluationLevel"] == "TRACE" + assert raw["evaluationTarget"]["traceIds"] == ["t1"] + assert raw["evaluationReferenceInputs"] == refs + + def test_multiple_user_messages_concatenated(self): + log_records = [ + _make_log_record( + input_messages=[ + {"role": "user", "content": "hello"}, + {"role": "user", "content": "world"}, + ], + output_messages=[{"role": "assistant", "content": "hi"}], + ) + ] + spans = [_make_span_with_log_records(log_records)] + event = _make_event(spans=spans) + parsed = ParsedEvaluationEvent.from_lambda_event(event) + metric = _mock_metric(name="AnswerRelevancyMetric") + + test_case = build_test_case(parsed, metric) + + assert test_case.input == "hello\nworld"