Skip to content

feat(dag_command.py): change to use bulk clear#68280

Draft
FrankYang0529 wants to merge 2 commits into
apache:mainfrom
FrankYang0529:airflow-67484
Draft

feat(dag_command.py): change to use bulk clear#68280
FrankYang0529 wants to merge 2 commits into
apache:mainfrom
FrankYang0529:airflow-67484

Conversation

@FrankYang0529

Copy link
Copy Markdown
Member

closes: #67484


Why

airflow dags clear (#66004) loops dag.clear(run_id=...) once per matched run, issuing a separate SELECT + flush for each. A wide --partition-date-* window or broad --partition-key therefore costs N independent transactions.


How

Replace the per-run loop with a _bulk_clear_runs helper:

  • Iterates run_ids in chunks of _RUN_CHUNK_SIZE (500), fetching all matching TIs for each chunk with a single query.
  • Passes the entire chunk's TIs to clear_task_instances in one call.

Two new tests:

  • test_clears_multiple_runs_in_one_batch: asserts that 3 runs fitting in one chunk result in exactly one clear_task_instances call.
  • test_chunks_on_run_boundaries_clears_each_run_once: patches _RUN_CHUNK_SIZE to 2. 3 runs span 2 chunks.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Claude Code with Claude Opus 4.8


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

Signed-off-by: PoAn Yang <payang@apache.org>
@FrankYang0529 FrankYang0529 marked this pull request as draft June 9, 2026 11:44
session: Session,
) -> int:
"""Clear task instances for the given run_ids in chunks instead of one transaction per run."""
from airflow.utils.state import TaskInstanceState

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not do inline import

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for that. It's not a valid exception. Move import to top of file.

- Imports at top of file. Valid exceptions: circular imports, lazy loading for worker isolation, `TYPE_CHECKING` blocks.

cleared = 0
for chunk_start in range(0, len(run_ids), _RUN_CHUNK_SIZE):
chunk_run_ids = run_ids[chunk_start : chunk_start + _RUN_CHUNK_SIZE]
ti_query = select(TaskInstance).where(TaskInstance.run_id.in_(chunk_run_ids))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also filter by dag_id

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Updated it.

Signed-off-by: PoAn Yang <payang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bulk-clear path for airflow dags clear to avoid N transactions

2 participants