Skip to content
28 changes: 28 additions & 0 deletions backend/usage_v2/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import date, datetime, time, timedelta
from typing import Any

from account_usage.models import PageUsage
from django.db.models import Count, QuerySet, Sum
from django.utils import timezone
from rest_framework.exceptions import APIException, ValidationError
Expand All @@ -23,6 +24,33 @@


class UsageHelper:
@staticmethod
def get_aggregated_pages_processed(
run_id: str | None = None,
run_ids: list[str] | None = None,
) -> int | None:
"""Retrieve aggregated pages processed for given run_id(s).

Provide either a single run_id or a list of run_ids.

Args:
run_id: Single file execution ID.
run_ids: List of file execution IDs.

Returns:
int | None: Total pages processed, or None if no records found.
"""
if run_id:
queryset = PageUsage.objects.filter(run_id=run_id)
elif run_ids:
queryset = PageUsage.objects.filter(run_id__in=run_ids)
else:
return None
if not queryset.exists():
return None
result = queryset.aggregate(total_pages=Sum("pages_processed"))
return result.get("total_pages")

@staticmethod
def get_aggregated_token_count(run_id: str) -> dict:
"""Retrieve aggregated token counts for the given run_id.
Expand Down
3 changes: 3 additions & 0 deletions backend/workflow_manager/endpoint_v2/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,9 @@ def get_combined_metadata(self) -> dict[str, Any]:

# Combine both metadata
workflow_metadata["usage"] = usage_metadata
workflow_metadata["total_pages_processed"] = (
UsageHelper.get_aggregated_pages_processed(run_id=file_execution_id)
)

return workflow_metadata

Expand Down
5 changes: 5 additions & 0 deletions backend/workflow_manager/execution/serializer/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class ExecutionSerializer(serializers.ModelSerializer):
pipeline_name = serializers.SerializerMethodField()
successful_files = serializers.SerializerMethodField()
failed_files = serializers.SerializerMethodField()
aggregated_total_pages_processed = serializers.SerializerMethodField()
execution_time = serializers.ReadOnlyField(source="pretty_execution_time")

class Meta:
Expand All @@ -31,3 +32,7 @@ def get_successful_files(self, obj: WorkflowExecution) -> int:
def get_failed_files(self, obj: WorkflowExecution) -> int:
"""Return the count of failed executed files"""
return obj.file_executions.filter(status=ExecutionStatus.ERROR.value).count()

def get_aggregated_total_pages_processed(self, obj: WorkflowExecution) -> int | None:
"""Return the total pages processed across all file executions."""
return obj.aggregated_total_pages_processed
17 changes: 17 additions & 0 deletions backend/workflow_manager/workflow_v2/models/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pipeline_v2.models import Pipeline
from tags.models import Tag
from usage_v2.constants import UsageKeys
from usage_v2.helper import UsageHelper
from usage_v2.models import Usage
from utils.common_utils import CommonUtils
from utils.models.base_model import BaseModel
Expand Down Expand Up @@ -259,6 +260,22 @@ def aggregated_usage_cost(self) -> float | None:

return total_cost

@property
def aggregated_total_pages_processed(self) -> int | None:
"""Retrieve aggregated total pages processed for this execution.

Returns:
int | None: Total pages processed across all file executions,
or None if no page usage data exists.
"""
file_execution_ids = list(self.file_executions.values_list("id", flat=True))
if not file_execution_ids:
return None

return UsageHelper.get_aggregated_pages_processed(
run_ids=[str(fid) for fid in file_execution_ids]
)

@property
def is_completed(self) -> bool:
return ExecutionStatus.is_completed(self.status)
Expand Down