Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 23 additions & 219 deletions src/lightspeed_evaluation/core/output/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,21 @@
)
from lightspeed_evaluation.core.models.statistics import (
AgentTokenStats,
ConversationStats,
MetricStats,
NumericStats,
OverallStats,
StreamingStats,
TagStats,
)
from lightspeed_evaluation.core.models.quality import QualityReport
from lightspeed_evaluation.core.storage import FileBackendConfig, get_file_config
from lightspeed_evaluation.core.output.visualization import GraphGenerator
from lightspeed_evaluation.core.output.serializers import (
numeric_stats_to_dict,
streaming_stats_to_dict,
summary_to_detailed_stats_dict,
overall_to_basic_stats_dict,
result_to_json_dict,
metric_stats_to_dict,
conversation_stats_to_dict,
tag_stats_to_dict,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -229,7 +234,7 @@ def _create_graphs(

# Convert summary by_metric/by_conversation/by_tag to dict format
# that the GraphGenerator expects
detailed_stats = _summary_to_detailed_stats_dict(summary)
detailed_stats = summary_to_detailed_stats_dict(summary)

graph_generator = GraphGenerator(
output_dir=str(self.output_dir), figsize=figsize, dpi=dpi
Expand Down Expand Up @@ -312,7 +317,7 @@ def _generate_json_summary_from_model(
"total_evaluations": len(summary.results),
"summary_stats": summary_stats,
"configuration": self._build_config_dict(),
"results": [_result_to_json_dict(r) for r in summary.results],
"results": [result_to_json_dict(r) for r in summary.results],
}

with open(json_file, "w", encoding="utf-8") as f:
Expand Down Expand Up @@ -391,7 +396,7 @@ def _generate_quality_score_report(
for metric_id, stats in quality_report.extra_metrics.items()
},
"agent_latency_stats": (
_numeric_stats_to_dict(quality_report.agent_latency_stats)
numeric_stats_to_dict(quality_report.agent_latency_stats)
if quality_report.agent_latency_stats is not None
else None
),
Expand Down Expand Up @@ -426,7 +431,7 @@ def _generate_text_summary_from_model(
txt_file = out / f"{base_filename}_summary.txt"

# Build compatible dicts from summary model
basic_stats = _overall_to_basic_stats_dict(summary.overall)
basic_stats = overall_to_basic_stats_dict(summary.overall)
api_tokens = (
{
"total_api_input_tokens": summary.agent_token_usage.total_api_input_tokens,
Expand All @@ -441,9 +446,9 @@ def _generate_text_summary_from_model(
}
)
streaming_stats = (
_streaming_stats_to_dict(summary.streaming) if summary.streaming else {}
streaming_stats_to_dict(summary.streaming) if summary.streaming else {}
)
detailed_stats = _summary_to_detailed_stats_dict(summary)
detailed_stats = summary_to_detailed_stats_dict(summary)

with open(txt_file, "w", encoding="utf-8") as f:
f.write("LSC Evaluation Framework - Summary Report\n")
Expand Down Expand Up @@ -516,7 +521,7 @@ def _write_agent_latency_stats(
if agent_latency is None:
return # No API latency data available

stats_dict = _numeric_stats_to_dict(agent_latency)
stats_dict = numeric_stats_to_dict(agent_latency)
self._write_numeric_stats(
f,
"API Latency (seconds):\n" + "-" * 20,
Expand Down Expand Up @@ -768,7 +773,7 @@ def _build_json_summary_stats(summary: EvaluationSummary) -> dict[str, Any]:
api_total = agent_token_usage.total_api_tokens if agent_token_usage else 0

overall_stats = {
**_overall_to_basic_stats_dict(overall),
**overall_to_basic_stats_dict(overall),
"total_api_input_tokens": (
agent_token_usage.total_api_input_tokens if agent_token_usage else 0
),
Expand All @@ -781,218 +786,17 @@ def _build_json_summary_stats(summary: EvaluationSummary) -> dict[str, Any]:

result: dict[str, Any] = {
"overall": overall_stats,
"by_metric": _metric_stats_to_dict(summary.by_metric),
"by_conversation": _conversation_stats_to_dict(summary.by_conversation),
"by_tag": _tag_stats_to_dict(summary.by_tag),
"by_metric": metric_stats_to_dict(summary.by_metric),
"by_conversation": conversation_stats_to_dict(summary.by_conversation),
"by_tag": tag_stats_to_dict(summary.by_tag),
}

if summary.agent_latency_stats is not None:
result["agent_latency_stats"] = _numeric_stats_to_dict(
result["agent_latency_stats"] = numeric_stats_to_dict(
summary.agent_latency_stats
)

if summary.streaming is not None:
result["streaming_performance"] = _streaming_stats_to_dict(summary.streaming)
result["streaming_performance"] = streaming_stats_to_dict(summary.streaming)

return result


def _result_to_json_dict(r: EvaluationResult) -> dict[str, Any]:
"""Convert a single EvaluationResult to JSON-serializable dict.

Args:
r: The evaluation result to convert.

Returns:
Dictionary matching the existing JSON summary result format.
"""
return {
"conversation_group_id": r.conversation_group_id,
"tag": r.tag,
"turn_id": r.turn_id,
"metric_identifier": r.metric_identifier,
"result": r.result,
"score": r.score,
"threshold": r.threshold,
"execution_time": r.execution_time,
"evaluation_latency": r.evaluation_latency,
"judge_llm_input_tokens": r.judge_llm_input_tokens,
"judge_llm_output_tokens": r.judge_llm_output_tokens,
"judge_scores": (
[js.model_dump() for js in r.judge_scores] if r.judge_scores else None
),
"time_to_first_token": r.time_to_first_token,
"streaming_duration": r.streaming_duration,
"agent_latency": r.agent_latency,
"tokens_per_second": r.tokens_per_second,
}


def _overall_to_basic_stats_dict(
overall: OverallStats,
) -> dict[str, Any]:
"""Convert OverallStats to the dict format expected by text output.

Args:
overall: OverallStats model instance.

Returns:
Dictionary with keys matching the original calculate_basic_stats format.
"""
return {
"TOTAL": overall.total,
"PASS": overall.passed,
"FAIL": overall.failed,
"ERROR": overall.error,
"SKIPPED": overall.skipped,
"pass_rate": overall.pass_rate,
"fail_rate": overall.fail_rate,
"error_rate": overall.error_rate,
"skipped_rate": overall.skipped_rate,
"total_judge_llm_input_tokens": overall.total_judge_llm_input_tokens,
"total_judge_llm_output_tokens": overall.total_judge_llm_output_tokens,
"total_judge_llm_tokens": overall.total_judge_llm_tokens,
"total_embedding_tokens": overall.total_embedding_tokens,
}


def _group_stats_to_dict(
stats: MetricStats | ConversationStats | TagStats,
) -> dict[str, Any]:
"""Convert a group stats model to the dict format for text output.

Args:
stats: MetricStats, ConversationStats, or TagStats instance.

Returns:
Dictionary with lowercase keys matching original detailed stats format.
"""
result: dict[str, Any] = {
"pass": stats.passed,
"fail": stats.failed,
"error": stats.error,
"skipped": stats.skipped,
"pass_rate": stats.pass_rate,
"fail_rate": stats.fail_rate,
"error_rate": stats.error_rate,
"skipped_rate": stats.skipped_rate,
}
if (
isinstance(stats, (MetricStats, TagStats))
and stats.score_statistics is not None
):
score_stats = stats.score_statistics
result["score_statistics"] = {
"count": score_stats.count,
"mean": score_stats.mean,
"median": score_stats.median,
"std": score_stats.std,
"min": score_stats.min_score,
"max": score_stats.max_score,
"confidence_interval": (
score_stats.confidence_interval.model_dump()
if score_stats.confidence_interval is not None
else None
),
}
return result


def _metric_stats_to_dict(
by_metric: dict[str, MetricStats],
) -> dict[str, dict[str, Any]]:
"""Convert by_metric model dict to legacy dict format.

Args:
by_metric: Dictionary mapping metric IDs to MetricStats models.

Returns:
Dictionary in the original detailed stats format.
"""
return {k: _group_stats_to_dict(v) for k, v in by_metric.items()}


def _conversation_stats_to_dict(
by_conversation: dict[str, ConversationStats],
) -> dict[str, dict[str, Any]]:
"""Convert by_conversation model dict to legacy dict format.

Args:
by_conversation: Dictionary mapping conversation IDs to ConversationStats.

Returns:
Dictionary in the original detailed stats format.
"""
return {k: _group_stats_to_dict(v) for k, v in by_conversation.items()}


def _tag_stats_to_dict(
by_tag: dict[str, TagStats],
) -> dict[str, dict[str, Any]]:
"""Convert by_tag model dict to legacy dict format.

Args:
by_tag: Dictionary mapping tags to TagStats models.

Returns:
Dictionary in the original detailed stats format.
"""
return {k: _group_stats_to_dict(v) for k, v in by_tag.items()}


def _summary_to_detailed_stats_dict(
summary: EvaluationSummary,
) -> dict[str, Any]:
"""Convert EvaluationSummary to the detailed stats dict format.

This produces a dictionary with by_metric, by_conversation, by_tag keys
matching the format from compute_detailed_stats().

Args:
summary: The EvaluationSummary instance.

Returns:
Dictionary matching the original detailed stats format.
"""
return {
"by_metric": _metric_stats_to_dict(summary.by_metric),
"by_conversation": _conversation_stats_to_dict(summary.by_conversation),
"by_tag": _tag_stats_to_dict(summary.by_tag),
}


def _streaming_stats_to_dict(streaming: StreamingStats) -> dict[str, Any]:
"""Convert StreamingStats model to the dict format for text output.

Args:
streaming: StreamingStats model instance.

Returns:
Dictionary matching the original streaming stats format.
"""
result: dict[str, Any] = {}
for field_name in (
"time_to_first_token",
"streaming_duration",
"tokens_per_second",
):
numeric = getattr(streaming, field_name, None)
if numeric is not None:
result[field_name] = _numeric_stats_to_dict(numeric)
else:
result[field_name] = {"count": 0}
return result


def _numeric_stats_to_dict(numeric: NumericStats) -> dict[str, Any]:
"""Convert NumericStats model to dict format for text output."""
return {
"count": numeric.count,
"mean": numeric.mean,
"median": numeric.median,
"std": numeric.std,
"min": numeric.min_value,
"max": numeric.max_value,
"p95": numeric.p95,
"p99": numeric.p99,
}
Loading
Loading