Skip to content
21 changes: 21 additions & 0 deletions backend/usage_v2/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import date, datetime, time, timedelta
from typing import Any

from account_usage.models import PageUsage
from django.db.models import Count, QuerySet, Sum
from django.utils import timezone
from rest_framework.exceptions import APIException, ValidationError
Expand All @@ -23,6 +24,26 @@


class UsageHelper:
@staticmethod
def get_aggregated_pages_processed(run_id: str) -> int | None:
Comment thread
pk-zipstack marked this conversation as resolved.
Outdated
"""Retrieve aggregated pages processed for the given run_id.

Args:
run_id (str): The identifier for the page usage (file execution ID).

Returns:
int | None: Total pages processed, or None if no records found.
"""
try:
queryset = PageUsage.objects.filter(run_id=run_id)
if not queryset.exists():
return None
result = queryset.aggregate(total_pages=Sum("pages_processed"))
return result.get("total_pages")
except Exception as e:
logger.error(f"Error aggregating pages processed for run_id {run_id}: {e}")
return None
Comment thread
pk-zipstack marked this conversation as resolved.
Outdated

@staticmethod
def get_aggregated_token_count(run_id: str) -> dict:
"""Retrieve aggregated token counts for the given run_id.
Expand Down
3 changes: 3 additions & 0 deletions backend/workflow_manager/endpoint_v2/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,9 @@ def get_combined_metadata(self) -> dict[str, Any]:

# Combine both metadata
workflow_metadata["usage"] = usage_metadata
workflow_metadata["total_pages_processed"] = (
UsageHelper.get_aggregated_pages_processed(run_id=file_execution_id)
)

return workflow_metadata

Expand Down
5 changes: 5 additions & 0 deletions backend/workflow_manager/execution/serializer/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class ExecutionSerializer(serializers.ModelSerializer):
pipeline_name = serializers.SerializerMethodField()
successful_files = serializers.SerializerMethodField()
failed_files = serializers.SerializerMethodField()
aggregated_total_pages_processed = serializers.SerializerMethodField()
execution_time = serializers.ReadOnlyField(source="pretty_execution_time")

class Meta:
Expand All @@ -31,3 +32,7 @@ def get_successful_files(self, obj: WorkflowExecution) -> int:
def get_failed_files(self, obj: WorkflowExecution) -> int:
"""Return the count of failed executed files"""
return obj.file_executions.filter(status=ExecutionStatus.ERROR.value).count()

def get_aggregated_total_pages_processed(self, obj: WorkflowExecution) -> int | None:
"""Return the total pages processed across all file executions."""
return obj.aggregated_total_pages_processed
6 changes: 6 additions & 0 deletions backend/workflow_manager/workflow_v2/file_execution_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from tool_instance_v2.constants import ToolInstanceKey
from tool_instance_v2.models import ToolInstance
from tool_instance_v2.tool_instance_helper import ToolInstanceHelper
from usage_v2.helper import UsageHelper
from utils.constants import Account
from utils.local_context import StateStore

Expand Down Expand Up @@ -1035,6 +1036,11 @@ def _process_final_output(

if destination.is_api:
execution_metadata = destination.get_metadata(file_history)
if execution_metadata is not None:
total_pages = UsageHelper.get_aggregated_pages_processed(
run_id=file_execution_id
)
execution_metadata["total_pages_processed"] = total_pages
if cls._should_create_file_history(
destination=destination,
file_history=file_history,
Expand Down
21 changes: 21 additions & 0 deletions backend/workflow_manager/workflow_v2/models/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import uuid
from datetime import timedelta

from account_usage.models import PageUsage
from api_v2.models import APIDeployment
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
Expand Down Expand Up @@ -258,6 +259,26 @@ def aggregated_usage_cost(self) -> float | None:

return total_cost

@property
def aggregated_total_pages_processed(self) -> int | None:
"""Retrieve aggregated total pages processed for this execution.

Returns:
int | None: Total pages processed across all file executions,
or None if no page usage data exists.
"""
file_execution_ids = list(self.file_executions.values_list("id", flat=True))
if not file_execution_ids:
return None

str_ids = [str(fid) for fid in file_execution_ids]
queryset = PageUsage.objects.filter(run_id__in=str_ids)
if not queryset.exists():
return None

result = queryset.aggregate(total_pages=Sum("pages_processed"))
Comment thread
pk-zipstack marked this conversation as resolved.
Outdated
return result.get("total_pages")

@property
def is_completed(self) -> bool:
return ExecutionStatus.is_completed(self.status)
Expand Down