Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/610.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added `ref executions reingest` command to re-ingest existing execution results without re-running diagnostics. Supports three modes: `additive` (keep existing values, add new), `replace` (delete and re-ingest), and `versioned` (create new execution record).
2 changes: 1 addition & 1 deletion packages/climate-ref-core/src/climate_ref_core/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_env() -> Env:

# Load the environment variables from the .env file
# This will override any defaults set above
env.read_env(verbose=True)
env.read_env()

return env

Expand Down
134 changes: 134 additions & 0 deletions packages/climate-ref/src/climate_ref/cli/executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,140 @@ def stats(
pretty_print_df(results_df, console=console)


@app.command()
def reingest( # noqa: PLR0913
ctx: typer.Context,
group_ids: Annotated[
list[int] | None,
typer.Argument(help="Execution group IDs to reingest. If omitted, uses filters."),
] = None,
provider: Annotated[
list[str] | None,
typer.Option(
help="Filter by provider slug (substring match, case-insensitive). "
"Multiple values can be provided."
),
] = None,
diagnostic: Annotated[
list[str] | None,
typer.Option(
help="Filter by diagnostic slug (substring match, case-insensitive). "
"Multiple values can be provided."
),
] = None,
include_failed: Annotated[
bool,
typer.Option(
"--include-failed",
help="Also attempt reingest on failed executions.",
),
] = False,
dry_run: Annotated[
bool,
typer.Option(
"--dry-run",
help="Show what would be reingested without making changes.",
),
] = False,
force: bool = typer.Option(False, help="Skip confirmation prompt"),
) -> None:
"""
Reingest existing executions without re-running diagnostics.

Re-runs build_execution_result() on existing output files and re-ingests
the results into the database. Useful when new series definitions or
metadata extraction logic has been added.

A new Execution record is always created under the same ExecutionGroup,
leaving the original execution untouched. Results are treated as immutable.

The dirty flag is never modified by this command.
"""
import pandas as pd

from climate_ref.executor.reingest import (
get_executions_for_reingest,
reingest_execution,
)
from climate_ref.provider_registry import ProviderRegistry

config: Config = ctx.obj.config
db = ctx.obj.database
console = ctx.obj.console

if not any([group_ids, provider, diagnostic]):
logger.error(
"At least one filter is required (group IDs, --provider, or --diagnostic). "
"This prevents accidental reingest of all executions."
)
raise typer.Exit(code=1)

provider_registry = ProviderRegistry.build_from_config(config, db)

results = get_executions_for_reingest(
db,
execution_group_ids=group_ids,
provider_filters=provider,
diagnostic_filters=diagnostic,
include_failed=include_failed,
)

if not results:
console.print("No executions found matching the specified criteria.")
return

preview_df = pd.DataFrame(
[
{
"group_id": eg.id,
"execution_id": ex.id,
"provider": eg.diagnostic.provider.slug,
"diagnostic": eg.diagnostic.slug,
"key": eg.key,
"successful": ex.successful,
}
for eg, ex in results
]
)

if dry_run:
console.print(f"[bold]Dry run:[/] would reingest {len(results)} execution(s):")
pretty_print_df(preview_df, console=console)
return

console.print(f"Will reingest {len(results)} execution(s):")
pretty_print_df(preview_df, console=console)

if not force:
if not typer.confirm("\nProceed with reingest?"):
console.print("Reingest cancelled.")
return

# Ensure any autobegun transaction from the preview queries is closed
# so each reingest runs in its own top-level transaction (not a savepoint).
if db.session.in_transaction():
db.session.commit()

# Process each execution in a separate transaction
success_count = 0
skip_count = 0
for eg, ex in results:
with db.session.begin():
ok = reingest_execution(
config=config,
database=db,
execution=ex,
provider_registry=provider_registry,
)

if ok:
success_count += 1
else:
skip_count += 1

console.print(f"\n[green]Reingest complete:[/] {success_count} succeeded, {skip_count} skipped.")


@app.command()
def flag_dirty(ctx: typer.Context, execution_id: int) -> None:
"""
Expand Down
4 changes: 2 additions & 2 deletions packages/climate-ref/src/climate_ref/data_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ def finalise(self, subset: pd.DataFrame) -> pd.DataFrame:
Finalise unfinalised datasets in the given subset.

If the adapter supports finalization (implements FinaliseableDatasetAdapterMixin),
unfinalised datasets in the subset are finalized by opening their files.
unfinalised datasets in the subset are finalised by opening their files.
The internal cache and database are updated accordingly.

Parameters
----------
subset
DataFrame subset to finalize (typically after filter+group_by)
DataFrame subset to finalise (typically after filter+group_by)

Returns
-------
Expand Down
7 changes: 6 additions & 1 deletion packages/climate-ref/src/climate_ref/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,9 @@
from .result_handling import handle_execution_result
from .synchronous import SynchronousExecutor

__all__ = ["HPCExecutor", "LocalExecutor", "SynchronousExecutor", "handle_execution_result"]
__all__ = [
"HPCExecutor",
"LocalExecutor",
"SynchronousExecutor",
"handle_execution_result",
]
44 changes: 44 additions & 0 deletions packages/climate-ref/src/climate_ref/executor/fragment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""
Helpers for allocating non-colliding output fragment paths.
"""

import pathlib


def allocate_output_fragment(
base_fragment: str,
existing_fragments: set[str],
results_dir: pathlib.Path,
) -> str:
"""
Return a non-colliding output fragment path.

If *base_fragment* is not already used (neither in *existing_fragments* nor
on disk under *results_dir*), it is returned unchanged. Otherwise ``_v2``,
``_v3``, ... suffixes are tried until a free slot is found.

Parameters
----------
base_fragment
The natural fragment, e.g. ``provider/diagnostic/dataset_hash``
existing_fragments
Set of ``output_fragment`` values already recorded in the database
for the relevant execution group
results_dir
Root results directory; used to check for orphaned directories on disk

Returns
-------
:
A fragment string guaranteed not to collide with *existing_fragments*
or any directory under *results_dir*
"""
if base_fragment not in existing_fragments and not (results_dir / base_fragment).exists():
return base_fragment

version = 2
while True:
candidate = f"{base_fragment}_v{version}"
if candidate not in existing_fragments and not (results_dir / candidate).exists():
return candidate
version += 1
Loading
Loading