Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,4 @@ local_settings.py
Dockerfile
CLAUDE.md
.omc/
.deepeval/
4 changes: 4 additions & 0 deletions src/bedrock_agentcore/evaluation/integrations/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
"""AgentCore Evaluation integrations."""

from bedrock_agentcore.evaluation.integrations.base import BaseAdapter, ParsedEvaluationEvent

__all__ = ["BaseAdapter", "ParsedEvaluationEvent"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Autoevals integration for AgentCore Evaluation."""

from bedrock_agentcore.evaluation.integrations.autoevals.adapter import AutoevalsAdapter

__all__ = ["AutoevalsAdapter"]
72 changes: 72 additions & 0 deletions src/bedrock_agentcore/evaluation/integrations/autoevals/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Autoevals adapter for AgentCore evaluation integrations."""

import logging
from typing import Any, Callable, Dict, Optional

from bedrock_agentcore.evaluation.integrations.base import BaseAdapter

logger = logging.getLogger(__name__)


class AutoevalsAdapter(BaseAdapter):
"""Adapter that runs an Autoevals scorer against AgentCore evaluation events.

Example::

from autoevals import Factuality

scorer = Factuality()
handler = AutoevalsAdapter(scorer=scorer)

# Use as Lambda handler
def lambda_handler(event, context):
return handler(event, context)
"""

def __init__(
self,
scorer: Any,
field_mapper: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None,
timeout: Optional[int] = None,
):
"""Initialize the adapter.

Args:
scorer: An Autoevals scorer instance (e.g. Factuality(), ClosedQA()).
field_mapper: Optional callable that receives the raw Lambda event and
returns a dict of field values. Bypasses default span extraction.
timeout: Maximum seconds to allow for scorer.eval(). Defaults to 290.
"""
super().__init__(field_mapper=field_mapper, timeout=timeout)
self.scorer = scorer

def validate_fields(self, fields: Dict[str, Any]) -> None:
"""Validate that input and actual_output are present."""
missing = []
if not fields.get("input"):
missing.append("input")
if not fields.get("actual_output"):
missing.append("actual_output")
if missing:
scorer_name = type(self.scorer).__name__
raise ValueError(
f"Field(s) {missing} required by {scorer_name} but not found in evaluation event. "
f"Provide a field_mapper or ensure spans contain the necessary data."
)

def execute(self, fields: Dict[str, Any]) -> Dict[str, Any]:
"""Run the Autoevals scorer and return formatted results."""
kwargs: Dict[str, Any] = {
"input": fields.get("input", ""),
"output": fields.get("actual_output", ""),
}
if fields.get("expected_output"):
kwargs["expected"] = fields["expected_output"]

result = self.scorer.eval(**kwargs)

score = result.score
label = "Pass" if score is not None and score >= 0.5 else "Fail"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The score >= 0.5 pass threshold is hardcoded. DeepEval honors the metric's own threshold, but Autoevals scorers have no universal 0.5 cutoff, so this will mislabel non-binary scorers. Let's make it configurable.

explanation = getattr(result, "metadata", {}).get("rationale", "") if hasattr(result, "metadata") else ""

return {"value": score, "label": label, "explanation": explanation}
302 changes: 302 additions & 0 deletions src/bedrock_agentcore/evaluation/integrations/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
"""Base adapter for AgentCore evaluation integrations."""

import abc
import json
import logging
import threading
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Union

from bedrock_agentcore.evaluation.custom_code_based_evaluators.models import EvaluatorInput

logger = logging.getLogger(__name__)


@dataclass
class ParsedEvaluationEvent:
"""Parsed representation of the AgentCore Lambda evaluation event."""

evaluation_level: str
session_spans: List[Dict[str, Any]]
target_trace_id: Optional[str] = None
target_span_id: Optional[str] = None
reference_inputs: List[Dict[str, Any]] = field(default_factory=list)

@classmethod
def from_lambda_event(cls, event: Dict[str, Any]) -> "ParsedEvaluationEvent":
"""Parse a raw Lambda event dict into a structured object.

Args:
event: Raw Lambda event payload from the evaluation service.

Returns:
ParsedEvaluationEvent with extracted fields.

Raises:
KeyError: If required top-level fields are missing.
"""
evaluation_input = event["evaluationInput"]
target = event.get("evaluationTarget") or {}
trace_ids = target.get("traceIds") or []
span_ids = target.get("spanIds") or []

return cls(
evaluation_level=event["evaluationLevel"],
session_spans=evaluation_input["sessionSpans"],
target_trace_id=trace_ids[0] if trace_ids else None,
target_span_id=span_ids[0] if span_ids else None,
reference_inputs=event.get("evaluationReferenceInputs") or [],
)


def _get_message_content(message: Any) -> str:
"""Extract text content from a message object.

Message content can be a dict with a "content" or "message" key, or a plain string.
Handles one level of nesting (e.g. {"content": {"content": "text"}}).
"""
if isinstance(message, str):
return message
if isinstance(message, dict):
for key in ("content", "message"):
if key in message:
val = message[key]
if isinstance(val, str):
return val
if isinstance(val, dict):
return _get_message_content(val)
return str(val)
return ""


def extract_fields_from_spans(
parsed: ParsedEvaluationEvent,
) -> Dict[str, Any]:
"""Extract evaluation fields from AgentCore session spans.

Parses _eval_log_records from span attributes, filters by target_trace_id,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tell me what otel agent semantic you are following here? Because I haven't seen any agent SDK emit this _eval_log_records?

and extracts messages by role:
- input ← input messages where role=="user"
- actual_output ← output messages where role=="assistant"
- retrieval_context ← output messages where role=="tool"
- context ← same as retrieval_context
- expected_output ← evaluationReferenceInputs[0].expectedResponse
"""
user_messages: List[str] = []
assistant_messages: List[str] = []
tool_messages: List[str] = []

for span in parsed.session_spans:
attributes = span.get("attributes", {})
log_records_raw = attributes.get("_eval_log_records")
if not log_records_raw:
continue

if isinstance(log_records_raw, str):
try:
log_records = json.loads(log_records_raw)
except (json.JSONDecodeError, TypeError):
logger.debug("Failed to parse _eval_log_records as JSON")
continue
else:
log_records = log_records_raw

if not isinstance(log_records, list):
continue

for record in log_records:
if not isinstance(record, dict):
continue

if parsed.target_trace_id:
record_trace_id = record.get("traceId") or record.get("trace_id")
if record_trace_id and record_trace_id != parsed.target_trace_id:
continue

body = record.get("body", {})
if not isinstance(body, dict):
continue

input_data = body.get("input", {})
if isinstance(input_data, dict):
for msg in input_data.get("messages", []):
if not isinstance(msg, dict):
continue
role = msg.get("role", "")
content = _get_message_content(msg)
if role == "user" and content:
user_messages.append(content)

output_data = body.get("output", {})
if isinstance(output_data, dict):
for msg in output_data.get("messages", []):
if not isinstance(msg, dict):
continue
role = msg.get("role", "")
content = _get_message_content(msg)
if role == "assistant" and content:
assistant_messages.append(content)
elif role == "tool" and content:
tool_messages.append(content)

fields: Dict[str, Any] = {}

if user_messages:
fields["input"] = "\n".join(user_messages)
if assistant_messages:
fields["actual_output"] = "\n".join(assistant_messages)
if tool_messages:
fields["retrieval_context"] = tool_messages
fields["context"] = tool_messages

if parsed.reference_inputs:
expected = parsed.reference_inputs[0].get("expectedResponse")
if expected:
fields["expected_output"] = expected

return fields


class _ExecutionTimeout(Exception):
"""Raised when execution exceeds the configured timeout."""


def _error_response(code: str, message: str) -> Dict[str, str]:
"""Build a standardized error response dict."""
return {"errorCode": code, "errorMessage": message}


class BaseAdapter(abc.ABC):
"""Base adapter for evaluation framework integrations.

Subclasses only need to implement execute(fields) which runs the actual
evaluation logic and returns (score, label, explanation).

Never raises unhandled exceptions — always returns a valid response dict.
"""

DEFAULT_TIMEOUT = 290

def __init__(
self,
field_mapper: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make extract_fields_from_spans the default value of field_mapper in the constructor? Then we have one extraction path instead of the if-field_mapper-else branch.

timeout: Optional[int] = None,
):
"""Initialize the adapter.

Args:
field_mapper: Optional callable that receives the raw Lambda event and
returns a dict of field values. Bypasses default span extraction.
timeout: Maximum seconds to allow for execute(). Defaults to 290
(slightly under Lambda's 300s max).
"""
self.field_mapper = field_mapper
self.timeout = timeout if timeout is not None else self.DEFAULT_TIMEOUT

def __call__(self, event: Union[Dict[str, Any], EvaluatorInput], context: Any = None) -> Dict[str, Any]:
"""Handle a Lambda invocation.

Args:
event: Either a raw Lambda event dict or an EvaluatorInput instance
from bedrock_agentcore.evaluation.custom_code_based_evaluators.models.
context: Lambda context object (unused).

Returns:
Success: {"value": float, "label": str, "explanation": str}
Error: {"errorCode": str, "errorMessage": str}
"""
try:
if isinstance(event, EvaluatorInput):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParsedEvaluationEvent and EvaluatorInput look like they're doing the same job — both just turn the raw lambda event into a structured input. call even copies one into the other field-for-field. Is there a reason we need a second type instead of reusing EvaluatorInput?

Proposal: make it a requirement that customers place these adapters within the @code_based_evaluators decorator. That way the adapter stops owning input/output validation and the decorator does it instead. Keeps the adapter focused on just running the eval.

parsed = ParsedEvaluationEvent(
evaluation_level=event.evaluation_level,
session_spans=event.session_spans,
target_trace_id=event.target_trace_id,
target_span_id=event.target_span_id,
reference_inputs=getattr(event, "reference_inputs", []) or [],
)
else:
parsed = ParsedEvaluationEvent.from_lambda_event(event)
except (KeyError, IndexError, TypeError) as e:
logger.error("Failed to parse evaluation event: %s", e)
return _error_response("INVALID_EVENT", f"Failed to parse evaluation event: {e}")

try:
fields = self._extract_fields(parsed)
except ValueError as e:
logger.error("Missing required fields: %s", e)
return _error_response("MISSING_REQUIRED_FIELD", str(e))

try:
result = self._execute_with_timeout(fields)
except _ExecutionTimeout:
return _error_response(
"METRIC_TIMEOUT",
f"{type(self).__name__} exceeded {self.timeout}s timeout.",
)
except Exception as e:
logger.error("Execution failed: %s", e, exc_info=True)
return _error_response("METRIC_ERROR", f"{type(self).__name__} failed: {e}")

return result

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please return EvaluatorOutput from CodeBasedEvaluator's models in this repo.


def _extract_fields(self, parsed: ParsedEvaluationEvent) -> Dict[str, Any]:
"""Extract fields from event, using field_mapper if provided."""
if self.field_mapper is not None:
raw_event = {
"evaluationLevel": parsed.evaluation_level,
"evaluationInput": {"sessionSpans": parsed.session_spans},
"evaluationTarget": {
"traceIds": [parsed.target_trace_id] if parsed.target_trace_id else [],
"spanIds": [parsed.target_span_id] if parsed.target_span_id else [],
},
"evaluationReferenceInputs": parsed.reference_inputs,
}
return self.field_mapper(raw_event)

fields = extract_fields_from_spans(parsed)
self.validate_fields(fields)
return fields

def validate_fields(self, fields: Dict[str, Any]) -> None:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add @AbstractMethod here please? The no-op default means a subclass that forgets to override it silently skips validation, and bad fields fail deeper in execute instead. Both adapters override it anyway, so abstract just makes each one declare its required fields on purpose.

"""Validate that required fields are present.

Override in subclasses to enforce field requirements.
Default implementation does nothing.
"""

@abc.abstractmethod
def execute(self, fields: Dict[str, Any]) -> Dict[str, Any]:
"""Run the evaluation and return the response dict.

Args:
fields: Extracted field dict with keys like "input", "actual_output", etc.

Returns:
{"value": float, "label": str, "explanation": str}
"""

def _execute_with_timeout(self, fields: Dict[str, Any]) -> Dict[str, Any]:
"""Run execute() with a thread-based timeout."""
if self.timeout <= 0:
return self.execute(fields)

result_holder: list = []
exception_holder: list = []

def target():
try:
result_holder.append(self.execute(fields))
except Exception as e:
exception_holder.append(e)

thread = threading.Thread(target=target, daemon=True)
thread.start()
thread.join(timeout=self.timeout)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the thread "times out" here, it doesn't actually end join just returns back to the caller while the worker keeps running. So if Lambda reuses the same container, we can have a background thread from a previous invocation still executing during the next one. I've heard this is a real failure case, so let's drop the thread machinery and let the AWS Lambda timeout handle it for us instead.


if thread.is_alive():
raise _ExecutionTimeout()

if exception_holder:
raise exception_holder[0]

return result_holder[0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""DeepEval integration for AgentCore Evaluation."""

from bedrock_agentcore.evaluation.integrations.deepeval.adapter import DeepEvalAdapter, DeepEvalHandler

__all__ = ["DeepEvalAdapter", "DeepEvalHandler"]
Loading