feat(cli): add reingest command for re-ingesting execution results#610
feat(cli): add reingest command for re-ingesting execution results#610lewisjared wants to merge 4 commits intomainfrom
Conversation
Add `ref executions reingest` command that re-runs build_execution_result() on existing outputs without re-executing diagnostics. Uses ReingestMode enum for the --mode option, providing tab completion and validated choices (additive, replace, versioned) via Typer's native enum support.
…d scratch cleanup Collapse four near-identical metric ingestion functions into two by adding an optional `existing` parameter for additive dedup. Hoist the dimension query into `_ingest_metrics` to avoid duplicate DB queries. Fix double iteration of `iter_results()` generator. Add try/finally scratch directory cleanup to prevent disk accumulation during batch reingests. Remove redundant dirty-flag save/restore in CLI.
There was a problem hiding this comment.
Pull request overview
Adds a new “reingest” workflow to the Climate REF CLI/executor layer to re-run build_execution_result() against existing on-disk outputs and ingest updated metrics/metadata into the DB without re-executing diagnostics.
Changes:
- Introduces
climate_ref.executor.reingestwithReingestModeand reingest/query helpers. - Adds
ref executions reingestCLI command with safety guards (filters required, confirm prompt, dry-run) and mode selection. - Adds unit tests for reingest modes and filtering, plus a changelog entry.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/climate-ref/src/climate_ref/executor/reingest.py | Implements reingest logic, scratch copying, savepoint-based DB mutation, and execution querying. |
| packages/climate-ref/src/climate_ref/cli/executions.py | Adds ref executions reingest command wiring and UX/safety features. |
| packages/climate-ref/src/climate_ref/executor/init.py | Exposes ReingestMode and reingest_execution in executor exports. |
| packages/climate-ref/tests/unit/executor/test_reingest.py | New unit tests covering reingest behavior across modes and filters. |
| changelog/610.feature.md | Documents the new CLI command and modes. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| for key, output_info in outputs.items(): | ||
| filename = ensure_relative_path(output_info.filename, results_base) |
There was a problem hiding this comment.
_handle_reingest_outputs() calls ensure_relative_path(output_info.filename, results_base), but the output bundle was produced in the scratch directory during re-extraction. If output_info.filename is an absolute path under scratch (as supported by ensure_relative_path and used in result_handling._handle_outputs), this will raise ValueError and abort reingest. Use the scratch output directory as the root for ensure_relative_path (or pass it in), and still store the resulting relative filename in the DB.
| for key, output_info in outputs.items(): | |
| filename = ensure_relative_path(output_info.filename, results_base) | |
| # During re-extraction, output bundles may be produced under the scratch | |
| # directory. In that case, output_info.filename can be an absolute path | |
| # under scratch, which would cause ensure_relative_path(..., results_base) | |
| # to raise ValueError. Fall back to using the scratch output directory as | |
| # the root for ensure_relative_path while still storing the resulting | |
| # relative path in the database. | |
| scratch_base = config.paths.scratch / execution.output_fragment | |
| for key, output_info in outputs.items(): | |
| try: | |
| filename = ensure_relative_path(output_info.filename, results_base) | |
| except ValueError: | |
| filename = ensure_relative_path(output_info.filename, scratch_base) |
| if result.output_bundle_filename: | ||
| if mode != ReingestMode.additive: | ||
| database.session.execute( | ||
| delete(ExecutionOutput).where(ExecutionOutput.execution_id == target_execution.id) | ||
| ) | ||
| _handle_reingest_output_bundle( | ||
| config, | ||
| database, | ||
| target_execution, | ||
| result.to_output_path(result.output_bundle_filename), | ||
| ) | ||
|
|
There was a problem hiding this comment.
In additive mode, existing ExecutionOutput rows are not deleted (mode != additive guard), but new outputs are still inserted from the output bundle. Re-running additive reingest will therefore accumulate duplicate outputs, and it also contradicts the CLI help text that says outputs are replaced. Either always clear outputs before inserting, or add dedup/upsert logic for outputs in additive mode.
| if mode == ReingestMode.versioned: | ||
| version_hash = hashlib.sha1( # noqa: S324 | ||
| f"{execution.output_fragment}-reingest-{execution.id}".encode() | ||
| ).hexdigest()[:12] | ||
|
|
||
| target_execution = Execution( | ||
| execution_group=execution_group, | ||
| dataset_hash=execution.dataset_hash, | ||
| output_fragment=f"{execution.output_fragment}_v{version_hash}", | ||
| ) |
There was a problem hiding this comment.
Versioned mode derives output_fragment from a deterministic hash of the original fragment and execution.id. Re-running versioned reingest for the same execution will generate the same output_fragment, overwriting the previous versioned results directory and making multiple “versions” impossible. Consider generating a unique fragment per run (e.g., include the new Execution.id after flush, a timestamp, or a UUID).
| if mode == ReingestMode.versioned: | ||
| assert result.metric_bundle_filename is not None |
There was a problem hiding this comment.
Only versioned mode calls target_execution.mark_successful(...). In additive/replace, a previously-failed execution (possible via --include-failed) will remain successful=False and path may stay unset/stale even after successfully ingesting new metric bundles/values. Consider updating successful/path on success for all modes (or at least when execution.successful is not already True).
| if mode == ReingestMode.versioned: | |
| assert result.metric_bundle_filename is not None | |
| # Mark the target execution as successful when we have a metric bundle. | |
| # For versioned mode, always update (preserving existing behavior). | |
| # For other modes, update at least when the execution is not already successful | |
| # so that previously failed executions can be corrected on successful reingest. | |
| if result.metric_bundle_filename is not None and ( | |
| mode == ReingestMode.versioned or not getattr(target_execution, "successful", False) | |
| ): |
| src = config.paths.results / output_fragment | ||
| dst = config.paths.scratch / output_fragment | ||
| if dst.exists(): | ||
| shutil.rmtree(dst) | ||
| shutil.copytree(src, dst) | ||
| return dst |
There was a problem hiding this comment.
_copy_results_to_scratch() (and later copy/delete operations) uses output_fragment from the DB to construct paths and then calls rmtree/copytree without verifying the computed paths stay under config.paths.scratch/results. If output_fragment is absolute or contains traversal, Path joining can escape the base directory and lead to deleting/copying arbitrary paths. Add safety checks similar to cli/executions.py (e.g., ensure dst.is_relative_to(config.paths.scratch) and src.is_relative_to(config.paths.results) before rmtree/copytree).
| # Copy the results directory to scratch so that build_execution_result() | ||
| # can write CMEC bundles without mutating the live results tree. | ||
| # If anything fails, the original files remain untouched. | ||
| scratch_dir = _copy_results_to_scratch(config, execution.output_fragment) | ||
|
|
There was a problem hiding this comment.
Scratch directory is fixed to config.paths.scratch / output_fragment and is deleted/recreated. This can race if multiple reingests run concurrently, and it can also conflict with a normal execution using the same output_fragment (since standard execution also uses scratch/output_fragment). Using a unique temporary scratch subdirectory per reingest run (and pointing ExecutionDefinition.output_directory at it) would avoid collisions and reduce the need for rmtree().
| console.print("Reingest cancelled.") | ||
| return | ||
|
|
||
| # Process each execution in a separate session |
There was a problem hiding this comment.
Comment says “separate session” but the code is using the same db.session and just opening a new transaction per execution with session.begin(). Consider rewording to “separate transaction” to avoid confusion about session/connection lifecycle.
| # Process each execution in a separate session | |
| # Process each execution in a separate transaction |
| ExecutionOutput( | ||
| execution_id=execution.id, | ||
| output_type="plot", | ||
| filename="test.png", | ||
| short_name="test", | ||
| ) |
There was a problem hiding this comment.
TestDeleteExecutionResults creates ExecutionOutput with output_type="plot" (string). ExecutionOutput.output_type is typed as ResultOutputType, and other tests use ResultOutputType.Plot; using a raw string may fail depending on SQLAlchemy Enum coercion. Prefer constructing with ResultOutputType.Plot for consistency and to avoid backend-dependent behavior.
Codecov Report❌ Patch coverage is
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 1 file with indirect coverage changes 🚀 New features to boost your workflow:
|
Description
Add
ref executions reingestcommand that re-runsbuild_execution_result()on existing output files and re-ingests the results into the database without re-executing diagnostics. This is useful when new series definitions or metadata extraction logic has been added.Key features:
ReingestModeenum:additive(keep existing, add new),replace(delete and re-ingest),versioned(new execution record)--modeoption for better UX (tab completion, validated choices shown in help)Checklist
Please confirm that this pull request has done the following:
changelog/