-
Notifications
You must be signed in to change notification settings - Fork 125
feat: Add generic BaseAdapter framework for third-party evaluator integration (DeepEval + Autoevals) #528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: Add generic BaseAdapter framework for third-party evaluator integration (DeepEval + Autoevals) #528
Changes from all commits
ba80889
b0d9682
81a46dd
3080e40
34674bb
6aedcbf
2260eb3
14f0354
b109a64
4e74926
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -229,3 +229,4 @@ local_settings.py | |
| Dockerfile | ||
| CLAUDE.md | ||
| .omc/ | ||
| .deepeval/ | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,5 @@ | ||
| """AgentCore Evaluation integrations.""" | ||
|
|
||
| from bedrock_agentcore.evaluation.integrations.base import BaseAdapter, ParsedEvaluationEvent | ||
|
|
||
| __all__ = ["BaseAdapter", "ParsedEvaluationEvent"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| """Autoevals integration for AgentCore Evaluation.""" | ||
|
|
||
| from bedrock_agentcore.evaluation.integrations.autoevals.adapter import AutoevalsAdapter | ||
|
|
||
| __all__ = ["AutoevalsAdapter"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| """Autoevals adapter for AgentCore evaluation integrations.""" | ||
|
|
||
| import logging | ||
| from typing import Any, Callable, Dict, Optional | ||
|
|
||
| from bedrock_agentcore.evaluation.integrations.base import BaseAdapter | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class AutoevalsAdapter(BaseAdapter): | ||
| """Adapter that runs an Autoevals scorer against AgentCore evaluation events. | ||
|
|
||
| Example:: | ||
|
|
||
| from autoevals import Factuality | ||
|
|
||
| scorer = Factuality() | ||
| handler = AutoevalsAdapter(scorer=scorer) | ||
|
|
||
| # Use as Lambda handler | ||
| def lambda_handler(event, context): | ||
| return handler(event, context) | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| scorer: Any, | ||
| field_mapper: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None, | ||
| timeout: Optional[int] = None, | ||
| ): | ||
| """Initialize the adapter. | ||
|
|
||
| Args: | ||
| scorer: An Autoevals scorer instance (e.g. Factuality(), ClosedQA()). | ||
| field_mapper: Optional callable that receives the raw Lambda event and | ||
| returns a dict of field values. Bypasses default span extraction. | ||
| timeout: Maximum seconds to allow for scorer.eval(). Defaults to 290. | ||
| """ | ||
| super().__init__(field_mapper=field_mapper, timeout=timeout) | ||
| self.scorer = scorer | ||
|
|
||
| def validate_fields(self, fields: Dict[str, Any]) -> None: | ||
| """Validate that input and actual_output are present.""" | ||
| missing = [] | ||
| if not fields.get("input"): | ||
| missing.append("input") | ||
| if not fields.get("actual_output"): | ||
| missing.append("actual_output") | ||
| if missing: | ||
| scorer_name = type(self.scorer).__name__ | ||
| raise ValueError( | ||
| f"Field(s) {missing} required by {scorer_name} but not found in evaluation event. " | ||
| f"Provide a field_mapper or ensure spans contain the necessary data." | ||
| ) | ||
|
|
||
| def execute(self, fields: Dict[str, Any]) -> Dict[str, Any]: | ||
| """Run the Autoevals scorer and return formatted results.""" | ||
| kwargs: Dict[str, Any] = { | ||
| "input": fields.get("input", ""), | ||
| "output": fields.get("actual_output", ""), | ||
| } | ||
| if fields.get("expected_output"): | ||
| kwargs["expected"] = fields["expected_output"] | ||
|
|
||
| result = self.scorer.eval(**kwargs) | ||
|
|
||
| score = result.score | ||
| label = "Pass" if score is not None and score >= 0.5 else "Fail" | ||
| explanation = getattr(result, "metadata", {}).get("rationale", "") if hasattr(result, "metadata") else "" | ||
|
|
||
| return {"value": score, "label": label, "explanation": explanation} | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,302 @@ | ||
| """Base adapter for AgentCore evaluation integrations.""" | ||
|
|
||
| import abc | ||
| import json | ||
| import logging | ||
| import threading | ||
| from dataclasses import dataclass, field | ||
| from typing import Any, Callable, Dict, List, Optional, Union | ||
|
|
||
| from bedrock_agentcore.evaluation.custom_code_based_evaluators.models import EvaluatorInput | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| @dataclass | ||
| class ParsedEvaluationEvent: | ||
| """Parsed representation of the AgentCore Lambda evaluation event.""" | ||
|
|
||
| evaluation_level: str | ||
| session_spans: List[Dict[str, Any]] | ||
| target_trace_id: Optional[str] = None | ||
| target_span_id: Optional[str] = None | ||
| reference_inputs: List[Dict[str, Any]] = field(default_factory=list) | ||
|
|
||
| @classmethod | ||
| def from_lambda_event(cls, event: Dict[str, Any]) -> "ParsedEvaluationEvent": | ||
| """Parse a raw Lambda event dict into a structured object. | ||
|
|
||
| Args: | ||
| event: Raw Lambda event payload from the evaluation service. | ||
|
|
||
| Returns: | ||
| ParsedEvaluationEvent with extracted fields. | ||
|
|
||
| Raises: | ||
| KeyError: If required top-level fields are missing. | ||
| """ | ||
| evaluation_input = event["evaluationInput"] | ||
| target = event.get("evaluationTarget") or {} | ||
| trace_ids = target.get("traceIds") or [] | ||
| span_ids = target.get("spanIds") or [] | ||
|
|
||
| return cls( | ||
| evaluation_level=event["evaluationLevel"], | ||
| session_spans=evaluation_input["sessionSpans"], | ||
| target_trace_id=trace_ids[0] if trace_ids else None, | ||
| target_span_id=span_ids[0] if span_ids else None, | ||
| reference_inputs=event.get("evaluationReferenceInputs") or [], | ||
| ) | ||
|
|
||
|
|
||
| def _get_message_content(message: Any) -> str: | ||
| """Extract text content from a message object. | ||
|
|
||
| Message content can be a dict with a "content" or "message" key, or a plain string. | ||
| Handles one level of nesting (e.g. {"content": {"content": "text"}}). | ||
| """ | ||
| if isinstance(message, str): | ||
| return message | ||
| if isinstance(message, dict): | ||
| for key in ("content", "message"): | ||
| if key in message: | ||
| val = message[key] | ||
| if isinstance(val, str): | ||
| return val | ||
| if isinstance(val, dict): | ||
| return _get_message_content(val) | ||
| return str(val) | ||
| return "" | ||
|
|
||
|
|
||
| def extract_fields_from_spans( | ||
| parsed: ParsedEvaluationEvent, | ||
| ) -> Dict[str, Any]: | ||
| """Extract evaluation fields from AgentCore session spans. | ||
|
|
||
| Parses _eval_log_records from span attributes, filters by target_trace_id, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you tell me what otel agent semantic you are following here? Because I haven't seen any agent SDK emit this _eval_log_records? |
||
| and extracts messages by role: | ||
| - input ← input messages where role=="user" | ||
| - actual_output ← output messages where role=="assistant" | ||
| - retrieval_context ← output messages where role=="tool" | ||
| - context ← same as retrieval_context | ||
| - expected_output ← evaluationReferenceInputs[0].expectedResponse | ||
| """ | ||
| user_messages: List[str] = [] | ||
| assistant_messages: List[str] = [] | ||
| tool_messages: List[str] = [] | ||
|
|
||
| for span in parsed.session_spans: | ||
| attributes = span.get("attributes", {}) | ||
| log_records_raw = attributes.get("_eval_log_records") | ||
| if not log_records_raw: | ||
| continue | ||
|
|
||
| if isinstance(log_records_raw, str): | ||
| try: | ||
| log_records = json.loads(log_records_raw) | ||
| except (json.JSONDecodeError, TypeError): | ||
| logger.debug("Failed to parse _eval_log_records as JSON") | ||
| continue | ||
| else: | ||
| log_records = log_records_raw | ||
|
|
||
| if not isinstance(log_records, list): | ||
| continue | ||
|
|
||
| for record in log_records: | ||
| if not isinstance(record, dict): | ||
| continue | ||
|
|
||
| if parsed.target_trace_id: | ||
| record_trace_id = record.get("traceId") or record.get("trace_id") | ||
| if record_trace_id and record_trace_id != parsed.target_trace_id: | ||
| continue | ||
|
|
||
| body = record.get("body", {}) | ||
| if not isinstance(body, dict): | ||
| continue | ||
|
|
||
| input_data = body.get("input", {}) | ||
| if isinstance(input_data, dict): | ||
| for msg in input_data.get("messages", []): | ||
| if not isinstance(msg, dict): | ||
| continue | ||
| role = msg.get("role", "") | ||
| content = _get_message_content(msg) | ||
| if role == "user" and content: | ||
| user_messages.append(content) | ||
|
|
||
| output_data = body.get("output", {}) | ||
| if isinstance(output_data, dict): | ||
| for msg in output_data.get("messages", []): | ||
| if not isinstance(msg, dict): | ||
| continue | ||
| role = msg.get("role", "") | ||
| content = _get_message_content(msg) | ||
| if role == "assistant" and content: | ||
| assistant_messages.append(content) | ||
| elif role == "tool" and content: | ||
| tool_messages.append(content) | ||
|
|
||
| fields: Dict[str, Any] = {} | ||
|
|
||
| if user_messages: | ||
| fields["input"] = "\n".join(user_messages) | ||
| if assistant_messages: | ||
| fields["actual_output"] = "\n".join(assistant_messages) | ||
| if tool_messages: | ||
| fields["retrieval_context"] = tool_messages | ||
| fields["context"] = tool_messages | ||
|
|
||
| if parsed.reference_inputs: | ||
| expected = parsed.reference_inputs[0].get("expectedResponse") | ||
| if expected: | ||
| fields["expected_output"] = expected | ||
|
|
||
| return fields | ||
|
|
||
|
|
||
| class _ExecutionTimeout(Exception): | ||
| """Raised when execution exceeds the configured timeout.""" | ||
|
|
||
|
|
||
| def _error_response(code: str, message: str) -> Dict[str, str]: | ||
| """Build a standardized error response dict.""" | ||
| return {"errorCode": code, "errorMessage": message} | ||
|
|
||
|
|
||
| class BaseAdapter(abc.ABC): | ||
| """Base adapter for evaluation framework integrations. | ||
|
|
||
| Subclasses only need to implement execute(fields) which runs the actual | ||
| evaluation logic and returns (score, label, explanation). | ||
|
|
||
| Never raises unhandled exceptions — always returns a valid response dict. | ||
| """ | ||
|
|
||
| DEFAULT_TIMEOUT = 290 | ||
|
|
||
| def __init__( | ||
| self, | ||
| field_mapper: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make |
||
| timeout: Optional[int] = None, | ||
| ): | ||
| """Initialize the adapter. | ||
|
|
||
| Args: | ||
| field_mapper: Optional callable that receives the raw Lambda event and | ||
| returns a dict of field values. Bypasses default span extraction. | ||
| timeout: Maximum seconds to allow for execute(). Defaults to 290 | ||
| (slightly under Lambda's 300s max). | ||
| """ | ||
| self.field_mapper = field_mapper | ||
| self.timeout = timeout if timeout is not None else self.DEFAULT_TIMEOUT | ||
|
|
||
| def __call__(self, event: Union[Dict[str, Any], EvaluatorInput], context: Any = None) -> Dict[str, Any]: | ||
| """Handle a Lambda invocation. | ||
|
|
||
| Args: | ||
| event: Either a raw Lambda event dict or an EvaluatorInput instance | ||
| from bedrock_agentcore.evaluation.custom_code_based_evaluators.models. | ||
| context: Lambda context object (unused). | ||
|
|
||
| Returns: | ||
| Success: {"value": float, "label": str, "explanation": str} | ||
| Error: {"errorCode": str, "errorMessage": str} | ||
| """ | ||
| try: | ||
| if isinstance(event, EvaluatorInput): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ParsedEvaluationEvent and EvaluatorInput look like they're doing the same job — both just turn the raw lambda event into a structured input. call even copies one into the other field-for-field. Is there a reason we need a second type instead of reusing EvaluatorInput? Proposal: make it a requirement that customers place these adapters within the @code_based_evaluators decorator. That way the adapter stops owning input/output validation and the decorator does it instead. Keeps the adapter focused on just running the eval. |
||
| parsed = ParsedEvaluationEvent( | ||
| evaluation_level=event.evaluation_level, | ||
| session_spans=event.session_spans, | ||
| target_trace_id=event.target_trace_id, | ||
| target_span_id=event.target_span_id, | ||
| reference_inputs=getattr(event, "reference_inputs", []) or [], | ||
| ) | ||
| else: | ||
| parsed = ParsedEvaluationEvent.from_lambda_event(event) | ||
| except (KeyError, IndexError, TypeError) as e: | ||
| logger.error("Failed to parse evaluation event: %s", e) | ||
| return _error_response("INVALID_EVENT", f"Failed to parse evaluation event: {e}") | ||
|
|
||
| try: | ||
| fields = self._extract_fields(parsed) | ||
| except ValueError as e: | ||
| logger.error("Missing required fields: %s", e) | ||
| return _error_response("MISSING_REQUIRED_FIELD", str(e)) | ||
|
|
||
| try: | ||
| result = self._execute_with_timeout(fields) | ||
| except _ExecutionTimeout: | ||
| return _error_response( | ||
| "METRIC_TIMEOUT", | ||
| f"{type(self).__name__} exceeded {self.timeout}s timeout.", | ||
| ) | ||
| except Exception as e: | ||
| logger.error("Execution failed: %s", e, exc_info=True) | ||
| return _error_response("METRIC_ERROR", f"{type(self).__name__} failed: {e}") | ||
|
|
||
| return result | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please return EvaluatorOutput from CodeBasedEvaluator's models in this repo. |
||
|
|
||
| def _extract_fields(self, parsed: ParsedEvaluationEvent) -> Dict[str, Any]: | ||
| """Extract fields from event, using field_mapper if provided.""" | ||
| if self.field_mapper is not None: | ||
| raw_event = { | ||
| "evaluationLevel": parsed.evaluation_level, | ||
| "evaluationInput": {"sessionSpans": parsed.session_spans}, | ||
| "evaluationTarget": { | ||
| "traceIds": [parsed.target_trace_id] if parsed.target_trace_id else [], | ||
| "spanIds": [parsed.target_span_id] if parsed.target_span_id else [], | ||
| }, | ||
| "evaluationReferenceInputs": parsed.reference_inputs, | ||
| } | ||
| return self.field_mapper(raw_event) | ||
|
|
||
| fields = extract_fields_from_spans(parsed) | ||
| self.validate_fields(fields) | ||
| return fields | ||
|
|
||
| def validate_fields(self, fields: Dict[str, Any]) -> None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add @AbstractMethod here please? The no-op default means a subclass that forgets to override it silently skips validation, and bad fields fail deeper in execute instead. Both adapters override it anyway, so abstract just makes each one declare its required fields on purpose. |
||
| """Validate that required fields are present. | ||
|
|
||
| Override in subclasses to enforce field requirements. | ||
| Default implementation does nothing. | ||
| """ | ||
|
|
||
| @abc.abstractmethod | ||
| def execute(self, fields: Dict[str, Any]) -> Dict[str, Any]: | ||
| """Run the evaluation and return the response dict. | ||
|
|
||
| Args: | ||
| fields: Extracted field dict with keys like "input", "actual_output", etc. | ||
|
|
||
| Returns: | ||
| {"value": float, "label": str, "explanation": str} | ||
| """ | ||
|
|
||
| def _execute_with_timeout(self, fields: Dict[str, Any]) -> Dict[str, Any]: | ||
| """Run execute() with a thread-based timeout.""" | ||
| if self.timeout <= 0: | ||
| return self.execute(fields) | ||
|
|
||
| result_holder: list = [] | ||
| exception_holder: list = [] | ||
|
|
||
| def target(): | ||
| try: | ||
| result_holder.append(self.execute(fields)) | ||
| except Exception as e: | ||
| exception_holder.append(e) | ||
|
|
||
| thread = threading.Thread(target=target, daemon=True) | ||
| thread.start() | ||
| thread.join(timeout=self.timeout) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the thread "times out" here, it doesn't actually end join just returns back to the caller while the worker keeps running. So if Lambda reuses the same container, we can have a background thread from a previous invocation still executing during the next one. I've heard this is a real failure case, so let's drop the thread machinery and let the AWS Lambda timeout handle it for us instead. |
||
|
|
||
| if thread.is_alive(): | ||
| raise _ExecutionTimeout() | ||
|
|
||
| if exception_holder: | ||
| raise exception_holder[0] | ||
|
|
||
| return result_holder[0] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| """DeepEval integration for AgentCore Evaluation.""" | ||
|
|
||
| from bedrock_agentcore.evaluation.integrations.deepeval.adapter import DeepEvalAdapter, DeepEvalHandler | ||
|
|
||
| __all__ = ["DeepEvalAdapter", "DeepEvalHandler"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The score >= 0.5 pass threshold is hardcoded. DeepEval honors the metric's own threshold, but Autoevals scorers have no universal 0.5 cutoff, so this will mislabel non-binary scorers. Let's make it configurable.