-
Notifications
You must be signed in to change notification settings - Fork 5.8k
fix: capture per-task token usage to fix race condition in async execution #4286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix: capture per-task token usage to fix race condition in async execution #4286
Conversation
…ution Fixes crewAIInc#4168 The per-task token tracking had a race condition when using threading-based async_execution (task.async_execution=True with crew.kickoff()). Multiple concurrent tasks from the same agent would get incorrect token attribution because tokens_before was captured when tasks were queued, but tokens_after was captured sequentially when processing futures. Changes: - Add token_usage field to TaskOutput to store per-task metrics - Add _get_agent_token_usage() helper to capture token snapshot from agent's LLM - Add _calculate_token_delta() helper to compute token differences - Modify _execute_task_async() to capture tokens within the thread immediately before and after task execution, ensuring accurate per-task attribution - Update _execute_core() and _aexecute_core() for consistent token tracking across all execution paths The fix captures token usage snapshots within the execution context (thread or async task) rather than after futures are resolved, ensuring each task gets its own accurate token count even when running concurrently.
98efb32 to
56c3d02
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
This PR is being reviewed by Cursor Bugbot
Details
Your team is on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle for each member of your team.
To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.
| agent=agent.role, | ||
| output_format=self._get_output_format(), | ||
| messages=agent.last_messages, # type: ignore[attr-defined] | ||
| token_usage=token_delta, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Token usage lost when guardrails trigger task retry
Medium Severity
The token_usage tracking is incomplete. When _execute_core or _aexecute_core creates a TaskOutput with token_usage=token_delta, this value is lost if a guardrail fails and triggers a retry. The guardrail retry paths (_invoke_guardrail_function and _ainvoke_guardrail_function) create new TaskOutput objects without passing token_usage, resulting in token_usage=None for any task that goes through guardrail retry.
Additional Locations (1)
| tokens_after = self._get_agent_token_usage(agent or self.agent) | ||
|
|
||
| # Calculate and store the delta in the result | ||
| result.token_usage = self._calculate_token_delta(tokens_before, tokens_after) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant token tracking in async execution path
Low Severity
Token tracking happens twice when _execute_task_async calls _execute_core. Both methods capture tokens_before and tokens_after, and both set token_usage. Since _execute_core returns a TaskOutput with token_usage already set, the subsequent overwrite in _execute_task_async is redundant. Both run in the same thread, so the "within thread" rationale doesn't justify the duplication.
Additional Locations (1)
| # Capture token usage after execution | ||
| tokens_after = self._get_agent_token_usage(agent) | ||
| token_delta = self._calculate_token_delta(tokens_before, tokens_after) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Token capture misses output conversion LLM calls
Medium Severity
The tokens_after snapshot is captured BEFORE _export_output(result) is called, but _export_output can make LLM calls through the Converter class when converting task output to Pydantic/JSON models. When the raw result isn't valid JSON and needs LLM-assisted conversion, those additional LLM calls consume tokens that are not included in the task's token_usage. The token capture needs to happen after output conversion completes.
Additional Locations (1)
|
can you add a test for this please with the above test case you mentioned. please have @pytest.mark.vcr() on as well for the test |
Summary
Fixes #4168
The per-task token tracking had a race condition when using threading-based
async_execution(task.async_execution=Truewithcrew.kickoff()). Multiple concurrent tasks from the same agent would get incorrect token attribution becausetokens_beforewas captured when tasks were queued, buttokens_afterwas captured sequentially when processing futures.Problem
When multiple tasks with
async_execution=Trueare executed by the same agent:tokens_beforewas captured when the task was queuedtokens_beforevaluestokens_afterwas captured sequentially in_process_async_tasksafter callingfuture.result()Solution
Capture token usage snapshots within the execution context (thread or async task) rather than after futures are resolved:
token_usagefield toTaskOutputto store per-task metrics_get_agent_token_usage()helper to capture token snapshot from agent's LLM_calculate_token_delta()helper to compute token differences_execute_task_async()to capture tokens within the thread immediately before and after task execution_execute_core()and_aexecute_core()for consistent token tracking across all execution pathsFiles Changed
lib/crewai/src/crewai/task.py- Token tracking logic in task executionlib/crewai/src/crewai/tasks/task_output.py- Addtoken_usagefieldlib/crewai/tests/task/test_task_token_tracking.py- Tests for the fixNote
Adds accurate per-task token tracking and resolves async race conditions in token attribution.
token_usageonTaskOutputand importsUsageMetrics_get_agent_token_usage()and_calculate_token_delta()helpers_execute_task_async()to snapshot tokens before/after execution within the thread and store the delta on the result_execute_core()and_aexecute_core()so sync and async paths are consistentWritten by Cursor Bugbot for commit 56c3d02. This will update automatically on new commits. Configure here.