[FEAT] Expose total_pages_processed in execution API response#1801
[FEAT] Expose total_pages_processed in execution API response#1801pk-zipstack wants to merge 6 commits intomainfrom
Conversation
…adata Surface page usage data from PageUsage model in API responses to support tracking total pages processed per file execution and per workflow execution. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
for more information, see https://pre-commit.ci
Summary by CodeRabbit
WalkthroughThis pull request introduces pages-processed aggregation tracking across workflow executions. A new helper method aggregates PageUsage records by run ID(s), a model property collects run IDs and delegates to this helper, a serializer field exposes the aggregated value, and API metadata is augmented with the total pages processed. Changes
Sequence DiagramsequenceDiagram
participant API as API Endpoint
participant Serializer as ExecutionSerializer
participant Model as WorkflowExecution
participant Helper as UsageHelper
participant DB as PageUsage DB
API->>Serializer: get_serialized_execution()
Serializer->>Model: access aggregated_total_pages_processed
Model->>Model: collect file_execution_ids
Model->>Helper: get_aggregated_pages_processed(run_ids)
Helper->>DB: query PageUsage records
DB-->>Helper: filtered records
Helper->>Helper: aggregate Sum(pages_processed)
Helper-->>Model: total_pages | None
Model-->>Serializer: aggregated value
Serializer-->>API: serialized response with field
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (4)
backend/workflow_manager/workflow_v2/models/execution.py (2)
262-280:PageUsage.run_idhas no database index, but is now on multiple hot query paths.The
PageUsagemodel only indexesorganization_id. Bothfilter(run_id=run_id)(inUsageHelper.get_aggregated_pages_processed) andfilter(run_id__in=str_ids)(here) will full-scan thepage_usagetable as it grows. These queries are now triggered per-file in API deployments and per-row in execution list views.A migration adding a
db_index=Trueonrun_id(or aMeta.indexesentry) is recommended:# In account_usage/models.py – PageUsage.Meta indexes = [ models.Index(fields=["organization_id"]), models.Index(fields=["run_id"]), # + ]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/workflow_manager/workflow_v2/models/execution.py` around lines 262 - 280, Add a database index on PageUsage.run_id to avoid full table scans for queries used by aggregated_total_pages_processed and UsageHelper.get_aggregated_pages_processed: update the PageUsage model Meta to include an index for "run_id" (e.g., add models.Index(fields=["run_id"]) alongside the existing organization_id index) and create/apply a Django migration so the new index is created in the database.
274-280: Same redundant.exists()+.aggregate()double-query as inUsageHelper.
Sum("pages_processed")on an empty queryset returnsNonefor the key, so the.exists()check buys nothing. Collapsing to a single.aggregate()call saves one round-trip per property access (and this property is called per row in list-view serialization).♻️ Proposed fix
- str_ids = [str(fid) for fid in file_execution_ids] - queryset = PageUsage.objects.filter(run_id__in=str_ids) - if not queryset.exists(): - return None - - result = queryset.aggregate(total_pages=Sum("pages_processed")) - return result.get("total_pages") + str_ids = [str(fid) for fid in file_execution_ids] + result = PageUsage.objects.filter(run_id__in=str_ids).aggregate( + total_pages=Sum("pages_processed") + ) + return result.get("total_pages")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/workflow_manager/workflow_v2/models/execution.py` around lines 274 - 280, The current code does a redundant .exists() followed by .aggregate() which causes an extra DB round-trip; replace the two-step pattern in the block that builds str_ids and assigns queryset = PageUsage.objects.filter(run_id__in=str_ids) with a single aggregate call: call PageUsage.objects.filter(run_id__in=str_ids).aggregate(total_pages=Sum("pages_processed")) and return result.get("total_pages") (this preserves None for no rows) — update the logic around the variables file_execution_ids, queryset and the use of Sum("pages_processed")/pages_processed to remove the .exists() check and avoid the double query.backend/workflow_manager/execution/serializer/execution.py (1)
36-38:aggregated_total_pages_processedadds up to 3 extra DB queries per execution in list views.The model property fires: (1) a
values_listonfile_executions, (2) aPageUsage.exists(), (3) aPageUsage.aggregate(). Combined with the existing per-item queries forget_successful_filesandget_failed_files, execution list endpoints are now executing ~7 queries per row. The class-level TODO already calls this out; this field makes addressing it more urgent.Consider annotating the aggregate in the queryset that feeds the list view (e.g., via a subquery annotation or a bulk
prefetch_relatedapproach) rather than resolving it lazily per object.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/workflow_manager/execution/serializer/execution.py` around lines 36 - 38, The current get_aggregated_total_pages_processed serializer method calls the WorkflowExecution model property aggregated_total_pages_processed which triggers multiple DB queries per row; instead annotate the queryset that feeds the list view with the aggregated total (using a Subquery/OuterRef or a bulk aggregation with Prefetch over file_executions/PageUsage) and change get_aggregated_total_pages_processed to return that annotated value (e.g., read obj.annotated_aggregated_total_pages_processed or similar) so no per-object queries are run; update the view/queryset that constructs the list to include the annotation name you choose and ensure the serializer reads that attribute rather than accessing the model property.backend/usage_v2/helper.py (1)
37-45: Redundant.exists()check creates an unnecessary extra DB query.
Sumon an empty queryset already returnsNonefor the aggregated key, soresult.get("total_pages")will returnNonewhen there are no records — the.exists()guard is superfluous. The sibling methodget_aggregated_token_countuses a single.aggregate()call for this reason.Additionally, two static-analysis hints are valid here:
- BLE001: Replace bare
except Exceptionwith a narrower exception type, or at minimum annotate the intent.- TRY400:
logger.errorsuppresses the traceback;logger.exception(orlogger.error(..., exc_info=True)) is preferred.♻️ Proposed fix
- try: - queryset = PageUsage.objects.filter(run_id=run_id) - if not queryset.exists(): - return None - result = queryset.aggregate(total_pages=Sum("pages_processed")) - return result.get("total_pages") - except Exception as e: - logger.error(f"Error aggregating pages processed for run_id {run_id}: {e}") - return None + try: + result = PageUsage.objects.filter(run_id=run_id).aggregate( + total_pages=Sum("pages_processed") + ) + return result.get("total_pages") + except Exception as e: # noqa: BLE001 + logger.exception(f"Error aggregating pages processed for run_id {run_id}: {e}") + return None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/usage_v2/helper.py` around lines 37 - 45, Remove the redundant .exists() check and perform a single aggregate call on PageUsage.objects.filter(run_id=run_id) returning result.get("total_pages") (same pattern as get_aggregated_token_count); replace the bare except Exception with a narrower exception (e.g., catch django.db.DatabaseError) and log the failure with full traceback using logger.exception(...) (or logger.error(..., exc_info=True)) to preserve stack information while keeping the behavior of returning None on error.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@backend/usage_v2/helper.py`:
- Around line 37-45: Remove the redundant .exists() check and perform a single
aggregate call on PageUsage.objects.filter(run_id=run_id) returning
result.get("total_pages") (same pattern as get_aggregated_token_count); replace
the bare except Exception with a narrower exception (e.g., catch
django.db.DatabaseError) and log the failure with full traceback using
logger.exception(...) (or logger.error(..., exc_info=True)) to preserve stack
information while keeping the behavior of returning None on error.
In `@backend/workflow_manager/execution/serializer/execution.py`:
- Around line 36-38: The current get_aggregated_total_pages_processed serializer
method calls the WorkflowExecution model property
aggregated_total_pages_processed which triggers multiple DB queries per row;
instead annotate the queryset that feeds the list view with the aggregated total
(using a Subquery/OuterRef or a bulk aggregation with Prefetch over
file_executions/PageUsage) and change get_aggregated_total_pages_processed to
return that annotated value (e.g., read
obj.annotated_aggregated_total_pages_processed or similar) so no per-object
queries are run; update the view/queryset that constructs the list to include
the annotation name you choose and ensure the serializer reads that attribute
rather than accessing the model property.
In `@backend/workflow_manager/workflow_v2/models/execution.py`:
- Around line 262-280: Add a database index on PageUsage.run_id to avoid full
table scans for queries used by aggregated_total_pages_processed and
UsageHelper.get_aggregated_pages_processed: update the PageUsage model Meta to
include an index for "run_id" (e.g., add models.Index(fields=["run_id"])
alongside the existing organization_id index) and create/apply a Django
migration so the new index is created in the database.
- Around line 274-280: The current code does a redundant .exists() followed by
.aggregate() which causes an extra DB round-trip; replace the two-step pattern
in the block that builds str_ids and assigns queryset =
PageUsage.objects.filter(run_id__in=str_ids) with a single aggregate call: call
PageUsage.objects.filter(run_id__in=str_ids).aggregate(total_pages=Sum("pages_processed"))
and return result.get("total_pages") (this preserves None for no rows) — update
the logic around the variables file_execution_ids, queryset and the use of
Sum("pages_processed")/pages_processed to remove the .exists() check and avoid
the double query.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (5)
backend/usage_v2/helper.pybackend/workflow_manager/endpoint_v2/destination.pybackend/workflow_manager/execution/serializer/execution.pybackend/workflow_manager/workflow_v2/file_execution_tasks.pybackend/workflow_manager/workflow_v2/models/execution.py
…ption handling - Unify UsageHelper.get_aggregated_pages_processed() to accept either run_id or run_ids, eliminating duplicate PageUsage query logic - WorkflowExecution.aggregated_total_pages_processed now delegates to UsageHelper instead of duplicating the aggregation - Remove broad try/except so exceptions bubble up to middleware Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
backend/usage_v2/helper.py (1)
49-52: Remove the redundantexists()check — it causes an extra DB round-trip.Django's
Sumon an empty queryset returns{"total_pages": None}, soresult.get("total_pages")already returnsNonewith no records present. Theexists()guard adds a second database hit for every call without changing the outcome.♻️ Proposed simplification
- if not queryset.exists(): - return None - result = queryset.aggregate(total_pages=Sum("pages_processed")) - return result.get("total_pages") + result = queryset.aggregate(total_pages=Sum("pages_processed")) + return result.get("total_pages")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/usage_v2/helper.py` around lines 49 - 52, The current code performs an extra DB round-trip by calling queryset.exists() before aggregating; remove the exists() check and directly call queryset.aggregate(total_pages=Sum("pages_processed")) and return result.get("total_pages") — keep the aggregate on the same queryset (using Sum and "pages_processed") so empty querysets yield None without the extra exists() query.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/workflow_manager/workflow_v2/models/execution.py`:
- Around line 261-277: The aggregated_total_pages_processed property issues
per-object DB queries (file_executions.values_list(...) and
UsageHelper.get_aggregated_pages_processed(...)) causing an N+1 when used in
ExecutionSerializer list endpoints; fix by either adding a DB index on
PageUsage.run_id to speed each aggregate query and adding a migration for that
index, or (preferred) batch the totals in the list view/serializer by computing
aggregated page totals for all execution IDs in one query and attaching them to
the queryset (override the list view or ExecutionSerializer to accept a
precomputed map keyed by execution id and avoid calling
aggregated_total_pages_processed per instance); reference the property
aggregated_total_pages_processed, the call to file_executions.values_list, and
UsageHelper.get_aggregated_pages_processed when implementing the change.
---
Nitpick comments:
In `@backend/usage_v2/helper.py`:
- Around line 49-52: The current code performs an extra DB round-trip by calling
queryset.exists() before aggregating; remove the exists() check and directly
call queryset.aggregate(total_pages=Sum("pages_processed")) and return
result.get("total_pages") — keep the aggregate on the same queryset (using Sum
and "pages_processed") so empty querysets yield None without the extra exists()
query.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (2)
backend/usage_v2/helper.pybackend/workflow_manager/workflow_v2/models/execution.py
…tadata Resolve conflict: accept deletion of file_execution_tasks.py (dead code removed in #1777 after workers v2 migration). The API metadata enrichment change from that file is no longer needed as workers v2 handles destination processing differently. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
for more information, see https://pre-commit.ci
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|
|
There was a problem hiding this comment.
♻️ Duplicate comments (1)
backend/workflow_manager/workflow_v2/models/execution.py (1)
262-276:⚠️ Potential issue | 🟠 MajorN+1 query risk still exists in aggregated pages property.
This property still performs per-instance DB work (file execution ID lookup + pages aggregation). Since it is exposed on the execution list serializer, this scales poorly with list size.
⚡ Minimal model-side hook to support batched precomputation
`@property` def aggregated_total_pages_processed(self) -> int | None: + if hasattr(self, "_aggregated_total_pages_processed"): + return self._aggregated_total_pages_processed + file_execution_ids = list(self.file_executions.values_list("id", flat=True)) if not file_execution_ids: return None return UsageHelper.get_aggregated_pages_processed( run_ids=[str(fid) for fid in file_execution_ids] )Then set
_aggregated_total_pages_processedin the list queryset/serializer precompute path to avoid per-row queries.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/workflow_manager/workflow_v2/models/execution.py` around lines 262 - 276, The aggregated_total_pages_processed property performs per-instance DB work via file_executions.values_list("id", flat=True) and UsageHelper.get_aggregated_pages_processed, causing N+1 queries when used in list serializers; change the property to first check for a precomputed attribute (e.g. _aggregated_total_pages_processed) and return it if present, and update the list queryset/serializer precompute path to compute aggregated pages for all executions in bulk (using the same UsageHelper.get_aggregated_pages_processed logic but passing all file_execution ids at once) and set each Execution instance's _aggregated_total_pages_processed before serialization so the property no longer hits the DB per instance.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@backend/workflow_manager/workflow_v2/models/execution.py`:
- Around line 262-276: The aggregated_total_pages_processed property performs
per-instance DB work via file_executions.values_list("id", flat=True) and
UsageHelper.get_aggregated_pages_processed, causing N+1 queries when used in
list serializers; change the property to first check for a precomputed attribute
(e.g. _aggregated_total_pages_processed) and return it if present, and update
the list queryset/serializer precompute path to compute aggregated pages for all
executions in bulk (using the same UsageHelper.get_aggregated_pages_processed
logic but passing all file_execution ids at once) and set each Execution
instance's _aggregated_total_pages_processed before serialization so the
property no longer hits the DB per instance.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (1)
backend/workflow_manager/workflow_v2/models/execution.py



What
UsageHelper.get_aggregated_pages_processed()to aggregate page count fromPageUsagemodel perrun_idaggregated_total_pages_processedproperty onWorkflowExecutionmodel to aggregate pages across all file executionstotal_pages_processedfor API destinations in_process_final_output()total_pages_processedfor DB destinations inget_combined_metadata()aggregated_total_pages_processedinExecutionSerializerfor the execution list APIWhy
PageUsagemodel already trackspages_processedperrun_id(file execution ID), but this data was not surfaced in API responsesHow
usage_v2/helper.py: Addedget_aggregated_pages_processed(run_id)static method that queriesPageUsage.objects.filter(run_id=run_id)and aggregatesSum('pages_processed'), returningint | Noneworkflow_v2/models/execution.py: Addedaggregated_total_pages_processedproperty that collects file execution IDs viaself.file_executions, converts to strings, and queriesPageUsagewithrun_id__inworkflow_v2/file_execution_tasks.py: In_process_final_output(), afterdestination.get_metadata()for API destinations, injectstotal_pages_processedintoexecution_metadataendpoint_v2/destination.py: Inget_combined_metadata(), addstotal_pages_processedalongside existingusagetoken dataexecution/serializer/execution.py: Addedaggregated_total_pages_processedas aSerializerMethodFieldCan this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
total_pages_processedfield gracefully returnsNonewhen noPageUsagedata exists.Database Migrations
Env Config
Relevant Docs
Related Issues or PRs
Dependencies Versions
Notes on Testing
total_pages_processedappears in per-file response metadatatotal_pages_processedappears in combined metadataaggregated_total_pages_processedappears in the responseNoneis returned gracefully when noPageUsagerecords exist for an executionScreenshots
Checklist
I have read and understood the Contribution Guidelines.